很简单的 Kubernetes Device Plugin 原理讲解
简介
Kubernetes Device Plugin 是官方提供的一种扩展系统可用设备类型的方法。通过 device plugin,可以将一种设备类型“接入” Kubernetes,允许调度器根据需求为容器分配这些设备,并由 kubelet 和 CRI 执行实际的挂载。该特性在 v1.10 成为 beta,目前尚在完善。Device Plugin 体系仅包含对文件的操作,比如将 /dev 目录下的指定设备或者一些指定文件挂载到容器的指定目录下,并不包含驱动配置等操作,因此容器需要在镜像中自行适配相关设备的驱动。
设备插件
FEATURE STATE: Kubernetes v1.10 [beta]
Kubernetes 提供了一个 设备插件框架,你可以用它来将系统硬件资源发布到 Kubelet。
供应商可以实现设备插件,由你手动部署或作为 DaemonSet 来部署,而不必定制 Kubernetes 本身的代码。目标设备包括 GPU、高性能 NIC、FPGA、 InfiniBand 适配器以及其他类似的、可能需要特定于供应商的初始化和设置的计算资源。
Device Plugin 的注册和使用流程
以Nvidia Device Plugin 为例
注册部分
- 首先,Device Plugin 通过 Kubelet 提供的 RPC 接口 Register 进行注册。
- // Register registers the device plugin for the given resourceName with Kubelet.
- func (m *NvidiaDevicePlugin) Register() error {
- conn, err := m.dial(pluginapi.KubeletSocket, 5*time.Second)
- if err != nil {
- return err
- }
- defer conn.Close()
- client := pluginapi.NewRegistrationClient(conn)
- reqt := &pluginapi.RegisterRequest{
- Version: pluginapi.Version,
- Endpoint: path.Base(m.socket),
- ResourceName: m.resourceName,
- Options: &pluginapi.DevicePluginOptions{
- GetPreferredAllocationAvailable: (m.allocatePolicy != nil),
- },
- }
- _, err = client.Register(context.Background(), reqt)
- if err != nil {
- return err
- }
- return nil
- }
go
在注册时,Device Plugin 需要提供的参数包括
- 自身的版本号
- 自身提供的 RPC 服务监听的 Unix Socket 的地址。这个 Socket 必须放在 /var/lib/kubelet/device_plugins 目录下,Kubelet 重启时会清空这个目录下的 .sock 文件,因此 Device Plugin 需要监控文件的变化,当 Kubelet 重启的时候,重启 RPC 服务重新注册
- 自身托管的资源的名称。比如 nvidia.com/gpu。如果想提供多种可用设备的话,需要为每种资源分别启动一个 RPC Server 去注册。
- 注册的选项,主要包括两个 bool 型变量
- PreStartRequired: 容器启动前是否必须调用 PreStartContainer 方法
- GetPreferredAllocationAvailable: 是否提供优选方法 GetPreferredAllocation,若不提供,则由调度器直接决定为容器分配哪些设备 有一点需要注意, 插件并非必须为 GetPreferredAllocation() 或 PreStartContainer() 提供有用 的实现逻辑,调用 GetDevicePluginOptions() 时所返回的 DevicePluginOptions 消息中应该设置这些调用是否可用。kubelet 在真正调用这些函数之前,总会调用 GetDevicePluginOptions() 来查看是否存在这些可选的函数。 此外,Device Plugin 需要感知自己的 .sock 有没有被删除,以判断 Kubelet 有没有重启。
- Kubelet 根据注册信息,向 Device Plugin 发起 List-Watch 请求,获取可用设备的状态。
- // ListAndWatch lists devices and update that list according to the health status
- func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
- s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
- for {
- select {
- case <-m.stop:
- return nil
- case d := <-m.health:
- // FIXME: there is no way to recover from the Unhealthy state.
- d.Health = pluginapi.Unhealthy
- log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
- s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
- }
- }
- }
go
这里,NVIDIA Device Plugin 会有一个定时检查设备健康程度的 Goroutine,如果发现设备不健康,就会通过 health 这个 Channel 传递不健康设备的指针,然后将这个设备设置成不健康。注意,NVIDIA Device Plugin 没有 Recover 逻辑,一个设备一旦被设置为不健康了,在 Device Plugin 重启之前,这个设备将一直处于不可用状态。 对于每个设备,Kubelet 只关心设备的 ID,健康状态,以及绑核(Numa核)逻辑。
- // E.g:
- // struct Device {
- // ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
- // Health: "Healthy",
- // Topology:
- // Node:
- // ID: 1
- //}
- type Device struct {
- // A unique ID assigned by the device plugin used
- // to identify devices during the communication
- // Max length of this field is 63 characters
- ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
- // Health of the device, can be healthy or unhealthy, see constants.go
- Health string `protobuf:"bytes,2,opt,name=health,proto3" json:"health,omitempty"`
- // Topology for device
- Topology *TopologyInfo `protobuf:"bytes,3,opt,name=topology,proto3" json:"topology,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
go
- Kubelet 通过 List-Watch 拿到可用设备列表后,会 Patch 更新 Node Allocatable 信息,并在本地文件 /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint 中更新一份可用设备列表的缓存。
- Scheduler 通过 List-Watch 获取 Node 可用的设备,并在调度时根据 Pod Spec 进行分配。
分配部分
- Scheduler 完成调度后,会将 Pod Bind 到节点上,此时 Kubelet 会通过 List-Watch 感知到这一事件,并开始调用 Device Plugin 来为 Pod 准备设备的挂载等操作。
- 如前所述,Kubelet 先调用 GetDevicePluginOptions 获取当前 Device Plugin 提供的能力,比如是否可以优选,是否可以在容器启动前执行一些逻辑。
- 如果配置了优选,那么 Kubelet 会调用 Device Plugin 提供的 GetPreferredAllocation 方法。发送一系列 ContainerPreferredAllocationRequest,每个请求包含可用的设备列表和必须包含的设备列表。由 Device Plugin 返回根据策略选出的设备列表。
- // PreferredAllocationRequest is passed via a call to GetPreferredAllocation()
- // at pod admission time. The device plugin should take the list of
- // `available_deviceIDs` and calculate a preferred allocation of size
- // 'allocation_size' from them, making sure to include the set of devices
- // listed in 'must_include_deviceIDs'.
- type PreferredAllocationRequest struct {
- ContainerRequests []*ContainerPreferredAllocationRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests,proto3" json:"container_requests,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- type ContainerPreferredAllocationRequest struct {
- // List of available deviceIDs from which to choose a preferred allocation
- AvailableDeviceIDs []string `protobuf:"bytes,1,rep,name=available_deviceIDs,json=availableDeviceIDs,proto3" json:"available_deviceIDs,omitempty"`
- // List of deviceIDs that must be included in the preferred allocation
- MustIncludeDeviceIDs []string `protobuf:"bytes,2,rep,name=must_include_deviceIDs,json=mustIncludeDeviceIDs,proto3" json:"must_include_deviceIDs,omitempty"`
- // Number of devices to include in the preferred allocation
- AllocationSize int32 `protobuf:"varint,3,opt,name=allocation_size,json=allocationSize,proto3" json:"allocation_size,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- // PreferredAllocationResponse returns a preferred allocation,
- // resulting from a PreferredAllocationRequest.
- type PreferredAllocationResponse struct {
- ContainerResponses []*ContainerPreferredAllocationResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses,proto3" json:"container_responses,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- type ContainerPreferredAllocationResponse struct {
- DeviceIDs []string `protobuf:"bytes,1,rep,name=deviceIDs,proto3" json:"deviceIDs,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- // GetPreferredAllocation returns the preferred allocation from the set of devices specified in the request
- func (m *NvidiaDevicePlugin) GetPreferredAllocation(ctx context.Context, r *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
- response := &pluginapi.PreferredAllocationResponse{}
- for _, req := range r.ContainerRequests {
- available, err := gpuallocator.NewDevicesFrom(req.AvailableDeviceIDs)
- if err != nil {
- return nil, fmt.Errorf("Unable to retrieve list of available devices: %v", err)
- }
- required, err := gpuallocator.NewDevicesFrom(req.MustIncludeDeviceIDs)
- if err != nil {
- return nil, fmt.Errorf("Unable to retrieve list of required devices: %v", err)
- }
- allocated := m.allocatePolicy.Allocate(available, required, int(req.AllocationSize))
- var deviceIds []string
- for _, device := range allocated {
- deviceIds = append(deviceIds, device.UUID)
- }
- resp := &pluginapi.ContainerPreferredAllocationResponse{
- DeviceIDs: deviceIds,
- }
- response.ContainerResponses = append(response.ContainerResponses, resp)
- }
- return response, nil
- }
go
- Allocate 执行实际的分配行为,要求 Device Plugin 根据分配请求,提供需要注入的环境变量、 Annotation 和挂载行为,这些均仅 CRI 可见,不会体现在 Pod Spec 上。
- // - Allocate is expected to be called during pod creation since allocation
- // failures for any container would result in pod startup failure.
- // - Allocate allows kubelet to exposes additional artifacts in a pod's
- // environment as directed by the plugin.
- // - Allocate allows Device Plugin to run device specific operations on
- // the Devices requested
- type AllocateRequest struct {
- ContainerRequests []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests,proto3" json:"container_requests,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- type ContainerAllocateRequest struct {
- DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs,proto3" json:"devicesIDs,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- // AllocateResponse includes the artifacts that needs to be injected into
- // a container for accessing 'deviceIDs' that were mentioned as part of
- // 'AllocateRequest'.
- // Failure Handling:
- // if Kubelet sends an allocation request for dev1 and dev2.
- // Allocation on dev1 succeeds but allocation on dev2 fails.
- // The Device plugin should send a ListAndWatch update and fail the
- // Allocation request
- type AllocateResponse struct {
- ContainerResponses []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses,proto3" json:"container_responses,omitempty"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- type ContainerAllocateResponse struct {
- // List of environment variable to be set in the container to access one of more devices.
- Envs map[string]string `protobuf:"bytes,1,rep,name=envs,proto3" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
- // Mounts for the container.
- Mounts []*Mount `protobuf:"bytes,2,rep,name=mounts,proto3" json:"mounts,omitempty"`
- // Devices for the container.
- Devices []*DeviceSpec `protobuf:"bytes,3,rep,name=devices,proto3" json:"devices,omitempty"`
- // Container annotations to pass to the container runtime
- Annotations map[string]string `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
- XXX_NoUnkeyedLiteral struct{} `json:"-"`
- XXX_sizecache int32 `json:"-"`
- }
- // Allocate which return list of devices.
- func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
- responses := pluginapi.AllocateResponse{}
- for _, req := range reqs.ContainerRequests {
- for _, id := range req.DevicesIDs {
- if !m.deviceExists(id) {
- return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", m.resourceName, id)
- }
- }
- response := pluginapi.ContainerAllocateResponse{}
- uuids := req.DevicesIDs
- deviceIDs := m.deviceIDsFromUUIDs(uuids)
- if deviceListStrategyFlag == DeviceListStrategyEnvvar {
- response.Envs = m.apiEnvs(m.deviceListEnvvar, deviceIDs)
- }
- if deviceListStrategyFlag == DeviceListStrategyVolumeMounts {
- response.Envs = m.apiEnvs(m.deviceListEnvvar, []string{deviceListAsVolumeMountsContainerPathRoot})
- response.Mounts = m.apiMounts(deviceIDs)
- }
- if passDeviceSpecsFlag {
- response.Devices = m.apiDeviceSpecs(nvidiaDriverRootFlag, uuids)
- }
- responses.ContainerResponses = append(responses.ContainerResponses, &response)
- }
- return &responses, nil
- }
go
Device Plugin 所要做的就是构造 ContainerAllocateResponse 响应体,返回 Annotation,Env,Mounts。实际上,对于 NVIDIA Runtime,只需要在容器中配置 NVIDIA_VISIBLE_DEVICES 这个环境变量,在 CRI 中会自动根据该项去挂载对应的 GPU 设备以及相关的 toolkit,如果配置了 NVIDIA_VISIBLE_DEVICES=all,则挂载全部设备。不需要返回 Annotation 和 Mount 信息。
5. 执行启动前逻辑。如果 Device Plugin 在注册时或者 GetDevicePluginOptions 中声明了要求执行启动前逻辑,则 Kubelet 会在 CRI 配置好容器后,在启动前调用 PreStartContainer 逻辑,此时 Pod 依然处于 Pending 状态,直到调用完毕。
Device Plugin 的局限性
- Device Plugin 只能根据分配的结果去做相应的操作,或者在单机层面做一些单机层面的优选行为,却不能介入节点的筛选过程。
- 设备的分配是依据 Index 来决定的,以单个设备为粒度进行分配,不支持按份数拆分。因此, Aliyun 的 GPU 共享插件为了支持按显存分配,将每一 G 显存都设置成了一个单独的设备。
- 所有的 RPC 接口都不提供 Pod 信息,无法感知每一块设备具体是被哪个 Pod 分走了。为此,Aliyun 的做法是同时侵入调度器,在调度时,为 Pod 打上 Annotation,然后在 Device Plugin 中,通过 Kubelet 获取当前节点上 Pending 的 Pod,根据 Annotation 的内容做对应。字节跳动内部为了解决这个问题对 Kubelet 本身做了 Hack,在调用传递的 Context 中,封入了 Pod 的 Name 和 Namespace 信息。该特性有社区在提交 PR,但是两年没合并。
- Device Plugin 鼓励的工作模式是无状态的,希望开发者不要在 Device Plugin 中记录任何的状态信息。
相关评论