很简单的 Kubernetes Device Plugin 原理讲解
简介
Kubernetes Device Plugin 是官方提供的一种扩展系统可用设备类型的方法。通过 device plugin,可以将一种设备类型“接入” Kubernetes,允许调度器根据需求为容器分配这些设备,并由 kubelet 和 CRI 执行实际的挂载。该特性在 v1.10 成为 beta,目前尚在完善。Device Plugin 体系仅包含对文件的操作,比如将 /dev 目录下的指定设备或者一些指定文件挂载到容器的指定目录下,并不包含驱动配置等操作,因此容器需要在镜像中自行适配相关设备的驱动。
设备插件
FEATURE STATE: Kubernetes v1.10 [beta]
Kubernetes 提供了一个 设备插件框架,你可以用它来将系统硬件资源发布到 Kubelet。
供应商可以实现设备插件,由你手动部署或作为 DaemonSet 来部署,而不必定制 Kubernetes 本身的代码。目标设备包括 GPU、高性能 NIC、FPGA、 InfiniBand 适配器以及其他类似的、可能需要特定于供应商的初始化和设置的计算资源。
Device Plugin 的注册和使用流程
以Nvidia Device Plugin 为例
注册部分
- 首先,Device Plugin 通过 Kubelet 提供的 RPC 接口 Register 进行注册。
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *NvidiaDevicePlugin) Register() error {
conn, err := m.dial(pluginapi.KubeletSocket, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: m.resourceName,
Options: &pluginapi.DevicePluginOptions{
GetPreferredAllocationAvailable: (m.allocatePolicy != nil),
},
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
在注册时,Device Plugin 需要提供的参数包括
- 自身的版本号
- 自身提供的 RPC 服务监听的 Unix Socket 的地址。这个 Socket 必须放在 /var/lib/kubelet/device_plugins 目录下,Kubelet 重启时会清空这个目录下的 .sock 文件,因此 Device Plugin 需要监控文件的变化,当 Kubelet 重启的时候,重启 RPC 服务重新注册
- 自身托管的资源的名称。比如 nvidia.com/gpu。如果想提供多种可用设备的话,需要为每种资源分别启动一个 RPC Server 去注册。
- 注册的选项,主要包括两个 bool 型变量
- PreStartRequired: 容器启动前是否必须调用 PreStartContainer 方法
- GetPreferredAllocationAvailable: 是否提供优选方法 GetPreferredAllocation,若不提供,则由调度器直接决定为容器分配哪些设备 有一点需要注意, 插件并非必须为 GetPreferredAllocation() 或 PreStartContainer() 提供有用 的实现逻辑,调用 GetDevicePluginOptions() 时所返回的 DevicePluginOptions 消息中应该设置这些调用是否可用。kubelet 在真正调用这些函数之前,总会调用 GetDevicePluginOptions() 来查看是否存在这些可选的函数。 此外,Device Plugin 需要感知自己的 .sock 有没有被删除,以判断 Kubelet 有没有重启。
- Kubelet 根据注册信息,向 Device Plugin 发起 List-Watch 请求,获取可用设备的状态。
// ListAndWatch lists devices and update that list according to the health status
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
}
}
}
这里,NVIDIA Device Plugin 会有一个定时检查设备健康程度的 Goroutine,如果发现设备不健康,就会通过 health 这个 Channel 传递不健康设备的指针,然后将这个设备设置成不健康。注意,NVIDIA Device Plugin 没有 Recover 逻辑,一个设备一旦被设置为不健康了,在 Device Plugin 重启之前,这个设备将一直处于不可用状态。 对于每个设备,Kubelet 只关心设备的 ID,健康状态,以及绑核(Numa核)逻辑。
// E.g:
// struct Device {
// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
// Health: "Healthy",
// Topology:
// Node:
// ID: 1
//}
type Device struct {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
// Health of the device, can be healthy or unhealthy, see constants.go
Health string `protobuf:"bytes,2,opt,name=health,proto3" json:"health,omitempty"`
// Topology for device
Topology *TopologyInfo `protobuf:"bytes,3,opt,name=topology,proto3" json:"topology,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
- Kubelet 通过 List-Watch 拿到可用设备列表后,会 Patch 更新 Node Allocatable 信息,并在本地文件 /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint 中更新一份可用设备列表的缓存。
- Scheduler 通过 List-Watch 获取 Node 可用的设备,并在调度时根据 Pod Spec 进行分配。
分配部分
- Scheduler 完成调度后,会将 Pod Bind 到节点上,此时 Kubelet 会通过 List-Watch 感知到这一事件,并开始调用 Device Plugin 来为 Pod 准备设备的挂载等操作。
- 如前所述,Kubelet 先调用 GetDevicePluginOptions 获取当前 Device Plugin 提供的能力,比如是否可以优选,是否可以在容器启动前执行一些逻辑。
- 如果配置了优选,那么 Kubelet 会调用 Device Plugin 提供的 GetPreferredAllocation 方法。发送一系列 ContainerPreferredAllocationRequest,每个请求包含可用的设备列表和必须包含的设备列表。由 Device Plugin 返回根据策略选出的设备列表。
// PreferredAllocationRequest is passed via a call to GetPreferredAllocation()
// at pod admission time. The device plugin should take the list of
// `available_deviceIDs` and calculate a preferred allocation of size
// 'allocation_size' from them, making sure to include the set of devices
// listed in 'must_include_deviceIDs'.
type PreferredAllocationRequest struct {
ContainerRequests []*ContainerPreferredAllocationRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests,proto3" json:"container_requests,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type ContainerPreferredAllocationRequest struct {
// List of available deviceIDs from which to choose a preferred allocation
AvailableDeviceIDs []string `protobuf:"bytes,1,rep,name=available_deviceIDs,json=availableDeviceIDs,proto3" json:"available_deviceIDs,omitempty"`
// List of deviceIDs that must be included in the preferred allocation
MustIncludeDeviceIDs []string `protobuf:"bytes,2,rep,name=must_include_deviceIDs,json=mustIncludeDeviceIDs,proto3" json:"must_include_deviceIDs,omitempty"`
// Number of devices to include in the preferred allocation
AllocationSize int32 `protobuf:"varint,3,opt,name=allocation_size,json=allocationSize,proto3" json:"allocation_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
// PreferredAllocationResponse returns a preferred allocation,
// resulting from a PreferredAllocationRequest.
type PreferredAllocationResponse struct {
ContainerResponses []*ContainerPreferredAllocationResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses,proto3" json:"container_responses,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type ContainerPreferredAllocationResponse struct {
DeviceIDs []string `protobuf:"bytes,1,rep,name=deviceIDs,proto3" json:"deviceIDs,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
// GetPreferredAllocation returns the preferred allocation from the set of devices specified in the request
func (m *NvidiaDevicePlugin) GetPreferredAllocation(ctx context.Context, r *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
response := &pluginapi.PreferredAllocationResponse{}
for _, req := range r.ContainerRequests {
available, err := gpuallocator.NewDevicesFrom(req.AvailableDeviceIDs)
if err != nil {
return nil, fmt.Errorf("Unable to retrieve list of available devices: %v", err)
}
required, err := gpuallocator.NewDevicesFrom(req.MustIncludeDeviceIDs)
if err != nil {
return nil, fmt.Errorf("Unable to retrieve list of required devices: %v", err)
}
allocated := m.allocatePolicy.Allocate(available, required, int(req.AllocationSize))
var deviceIds []string
for _, device := range allocated {
deviceIds = append(deviceIds, device.UUID)
}
resp := &pluginapi.ContainerPreferredAllocationResponse{
DeviceIDs: deviceIds,
}
response.ContainerResponses = append(response.ContainerResponses, resp)
}
return response, nil
}
- Allocate 执行实际的分配行为,要求 Device Plugin 根据分配请求,提供需要注入的环境变量、 Annotation 和挂载行为,这些均仅 CRI 可见,不会体现在 Pod Spec 上。
// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
// environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
// the Devices requested
type AllocateRequest struct {
ContainerRequests []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests,proto3" json:"container_requests,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type ContainerAllocateRequest struct {
DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs,proto3" json:"devicesIDs,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
type AllocateResponse struct {
ContainerResponses []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses,proto3" json:"container_responses,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type ContainerAllocateResponse struct {
// List of environment variable to be set in the container to access one of more devices.
Envs map[string]string `protobuf:"bytes,1,rep,name=envs,proto3" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// Mounts for the container.
Mounts []*Mount `protobuf:"bytes,2,rep,name=mounts,proto3" json:"mounts,omitempty"`
// Devices for the container.
Devices []*DeviceSpec `protobuf:"bytes,3,rep,name=devices,proto3" json:"devices,omitempty"`
// Container annotations to pass to the container runtime
Annotations map[string]string `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
// Allocate which return list of devices.
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
for _, id := range req.DevicesIDs {
if !m.deviceExists(id) {
return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", m.resourceName, id)
}
}
response := pluginapi.ContainerAllocateResponse{}
uuids := req.DevicesIDs
deviceIDs := m.deviceIDsFromUUIDs(uuids)
if deviceListStrategyFlag == DeviceListStrategyEnvvar {
response.Envs = m.apiEnvs(m.deviceListEnvvar, deviceIDs)
}
if deviceListStrategyFlag == DeviceListStrategyVolumeMounts {
response.Envs = m.apiEnvs(m.deviceListEnvvar, []string{deviceListAsVolumeMountsContainerPathRoot})
response.Mounts = m.apiMounts(deviceIDs)
}
if passDeviceSpecsFlag {
response.Devices = m.apiDeviceSpecs(nvidiaDriverRootFlag, uuids)
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}
return &responses, nil
}
Device Plugin 所要做的就是构造 ContainerAllocateResponse 响应体,返回 Annotation,Env,Mounts。实际上,对于 NVIDIA Runtime,只需要在容器中配置 NVIDIA_VISIBLE_DEVICES 这个环境变量,在 CRI 中会自动根据该项去挂载对应的 GPU 设备以及相关的 toolkit,如果配置了 NVIDIA_VISIBLE_DEVICES=all,则挂载全部设备。不需要返回 Annotation 和 Mount 信息。
5. 执行启动前逻辑。如果 Device Plugin 在注册时或者 GetDevicePluginOptions 中声明了要求执行启动前逻辑,则 Kubelet 会在 CRI 配置好容器后,在启动前调用 PreStartContainer 逻辑,此时 Pod 依然处于 Pending 状态,直到调用完毕。
Device Plugin 的局限性
- Device Plugin 只能根据分配的结果去做相应的操作,或者在单机层面做一些单机层面的优选行为,却不能介入节点的筛选过程。
- 设备的分配是依据 Index 来决定的,以单个设备为粒度进行分配,不支持按份数拆分。因此, Aliyun 的 GPU 共享插件为了支持按显存分配,将每一 G 显存都设置成了一个单独的设备。
- 所有的 RPC 接口都不提供 Pod 信息,无法感知每一块设备具体是被哪个 Pod 分走了。为此,Aliyun 的做法是同时侵入调度器,在调度时,为 Pod 打上 Annotation,然后在 Device Plugin 中,通过 Kubelet 获取当前节点上 Pending 的 Pod,根据 Annotation 的内容做对应。字节跳动内部为了解决这个问题对 Kubelet 本身做了 Hack,在调用传递的 Context 中,封入了 Pod 的 Name 和 Namespace 信息。该特性有社区在提交 PR,但是两年没合并。
- Device Plugin 鼓励的工作模式是无状态的,希望开发者不要在 Device Plugin 中记录任何的状态信息。
相关评论