01月19, 2022

Kubernetes Device Plugin 原理讲解

很简单的 Kubernetes Device Plugin 原理讲解

简介

Kubernetes Device Plugin 是官方提供的一种扩展系统可用设备类型的方法。通过 device plugin,可以将一种设备类型“接入” Kubernetes,允许调度器根据需求为容器分配这些设备,并由 kubelet 和 CRI 执行实际的挂载。该特性在 v1.10 成为 beta,目前尚在完善。Device Plugin 体系仅包含对文件的操作,比如将 /dev 目录下的指定设备或者一些指定文件挂载到容器的指定目录下,并不包含驱动配置等操作,因此容器需要在镜像中自行适配相关设备的驱动。

设备插件
FEATURE STATE: Kubernetes v1.10 [beta]
Kubernetes 提供了一个 设备插件框架,你可以用它来将系统硬件资源发布到 Kubelet。
供应商可以实现设备插件,由你手动部署或作为 DaemonSet 来部署,而不必定制 Kubernetes 本身的代码。目标设备包括 GPU、高性能 NIC、FPGA、 InfiniBand 适配器以及其他类似的、可能需要特定于供应商的初始化和设置的计算资源。

Device Plugin 的注册和使用流程

以Nvidia Device Plugin 为例 Device Plugin 工作流程.jpeg

注册部分

  1. 首先,Device Plugin 通过 Kubelet 提供的 RPC 接口 Register 进行注册。
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *NvidiaDevicePlugin) Register() error {
    conn, err := m.dial(pluginapi.KubeletSocket, 5*time.Second)
    if err != nil {
        return err
    }
    defer conn.Close()

    client := pluginapi.NewRegistrationClient(conn)
    reqt := &pluginapi.RegisterRequest{
        Version:      pluginapi.Version,
        Endpoint:     path.Base(m.socket),
        ResourceName: m.resourceName,
        Options: &pluginapi.DevicePluginOptions{
            GetPreferredAllocationAvailable: (m.allocatePolicy != nil),
        },
    }

    _, err = client.Register(context.Background(), reqt)
    if err != nil {
        return err
    }
    return nil
}

在注册时,Device Plugin 需要提供的参数包括

  • 自身的版本号
  • 自身提供的 RPC 服务监听的 Unix Socket 的地址。这个 Socket 必须放在 /var/lib/kubelet/device_plugins 目录下,Kubelet 重启时会清空这个目录下的 .sock 文件,因此 Device Plugin 需要监控文件的变化,当 Kubelet 重启的时候,重启 RPC 服务重新注册
  • 自身托管的资源的名称。比如 nvidia.com/gpu。如果想提供多种可用设备的话,需要为每种资源分别启动一个 RPC Server 去注册。
  • 注册的选项,主要包括两个 bool 型变量
    • PreStartRequired: 容器启动前是否必须调用 PreStartContainer 方法
    • GetPreferredAllocationAvailable: 是否提供优选方法 GetPreferredAllocation,若不提供,则由调度器直接决定为容器分配哪些设备 有一点需要注意, 插件并非必须为 GetPreferredAllocation() 或 PreStartContainer() 提供有用 的实现逻辑,调用 GetDevicePluginOptions() 时所返回的 DevicePluginOptions 消息中应该设置这些调用是否可用。kubelet 在真正调用这些函数之前,总会调用 GetDevicePluginOptions() 来查看是否存在这些可选的函数。 此外,Device Plugin 需要感知自己的 .sock 有没有被删除,以判断 Kubelet 有没有重启。
  1. Kubelet 根据注册信息,向 Device Plugin 发起 List-Watch 请求,获取可用设备的状态。
// ListAndWatch lists devices and update that list according to the health status
func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})

    for {
        select {
        case <-m.stop:
            return nil
        case d := <-m.health:
            // FIXME: there is no way to recover from the Unhealthy state.
            d.Health = pluginapi.Unhealthy
            log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
            s.Send(&pluginapi.ListAndWatchResponse{Devices: m.apiDevices()})
        }
    }
}

这里,NVIDIA Device Plugin 会有一个定时检查设备健康程度的 Goroutine,如果发现设备不健康,就会通过 health 这个 Channel 传递不健康设备的指针,然后将这个设备设置成不健康。注意,NVIDIA Device Plugin 没有 Recover 逻辑,一个设备一旦被设置为不健康了,在 Device Plugin 重启之前,这个设备将一直处于不可用状态。 对于每个设备,Kubelet 只关心设备的 ID,健康状态,以及绑核(Numa核)逻辑。

// E.g:
// struct Device {
//    ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
//    Health: "Healthy",
//    Topology:
//      Node:
//        ID: 1
//}
type Device struct {
    // A unique ID assigned by the device plugin used
    // to identify devices during the communication
    // Max length of this field is 63 characters
    ID string `protobuf:"bytes,1,opt,name=ID,json=iD,proto3" json:"ID,omitempty"`
    // Health of the device, can be healthy or unhealthy, see constants.go
    Health string `protobuf:"bytes,2,opt,name=health,proto3" json:"health,omitempty"`
    // Topology for device
    Topology             *TopologyInfo `protobuf:"bytes,3,opt,name=topology,proto3" json:"topology,omitempty"`
    XXX_NoUnkeyedLiteral struct{}      `json:"-"`
    XXX_sizecache        int32         `json:"-"`
}
  1. Kubelet 通过 List-Watch 拿到可用设备列表后,会 Patch 更新 Node Allocatable 信息,并在本地文件 /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint 中更新一份可用设备列表的缓存。
  2. Scheduler 通过 List-Watch 获取 Node 可用的设备,并在调度时根据 Pod Spec 进行分配。

分配部分

  1. Scheduler 完成调度后,会将 Pod Bind 到节点上,此时 Kubelet 会通过 List-Watch 感知到这一事件,并开始调用 Device Plugin 来为 Pod 准备设备的挂载等操作。
  2. 如前所述,Kubelet 先调用 GetDevicePluginOptions 获取当前 Device Plugin 提供的能力,比如是否可以优选,是否可以在容器启动前执行一些逻辑。
  3. 如果配置了优选,那么 Kubelet 会调用 Device Plugin 提供的 GetPreferredAllocation 方法。发送一系列 ContainerPreferredAllocationRequest,每个请求包含可用的设备列表和必须包含的设备列表。由 Device Plugin 返回根据策略选出的设备列表。
// PreferredAllocationRequest is passed via a call to GetPreferredAllocation()
// at pod admission time. The device plugin should take the list of
// `available_deviceIDs` and calculate a preferred allocation of size
// 'allocation_size' from them, making sure to include the set of devices
// listed in 'must_include_deviceIDs'.
type PreferredAllocationRequest struct {
    ContainerRequests    []*ContainerPreferredAllocationRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests,proto3" json:"container_requests,omitempty"`
    XXX_NoUnkeyedLiteral struct{}                               `json:"-"`
    XXX_sizecache        int32                                  `json:"-"`
}

type ContainerPreferredAllocationRequest struct {
    // List of available deviceIDs from which to choose a preferred allocation
    AvailableDeviceIDs []string `protobuf:"bytes,1,rep,name=available_deviceIDs,json=availableDeviceIDs,proto3" json:"available_deviceIDs,omitempty"`
    // List of deviceIDs that must be included in the preferred allocation
    MustIncludeDeviceIDs []string `protobuf:"bytes,2,rep,name=must_include_deviceIDs,json=mustIncludeDeviceIDs,proto3" json:"must_include_deviceIDs,omitempty"`
    // Number of devices to include in the preferred allocation
    AllocationSize       int32    `protobuf:"varint,3,opt,name=allocation_size,json=allocationSize,proto3" json:"allocation_size,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

// PreferredAllocationResponse returns a preferred allocation,
// resulting from a PreferredAllocationRequest.
type PreferredAllocationResponse struct {
    ContainerResponses   []*ContainerPreferredAllocationResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses,proto3" json:"container_responses,omitempty"`
    XXX_NoUnkeyedLiteral struct{}                                `json:"-"`
    XXX_sizecache        int32                                   `json:"-"`
}

type ContainerPreferredAllocationResponse struct {
    DeviceIDs            []string `protobuf:"bytes,1,rep,name=deviceIDs,proto3" json:"deviceIDs,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

// GetPreferredAllocation returns the preferred allocation from the set of devices specified in the request
func (m *NvidiaDevicePlugin) GetPreferredAllocation(ctx context.Context, r *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
    response := &pluginapi.PreferredAllocationResponse{}
    for _, req := range r.ContainerRequests {
        available, err := gpuallocator.NewDevicesFrom(req.AvailableDeviceIDs)
        if err != nil {
            return nil, fmt.Errorf("Unable to retrieve list of available devices: %v", err)
        }

        required, err := gpuallocator.NewDevicesFrom(req.MustIncludeDeviceIDs)
        if err != nil {
            return nil, fmt.Errorf("Unable to retrieve list of required devices: %v", err)
        }

        allocated := m.allocatePolicy.Allocate(available, required, int(req.AllocationSize))

        var deviceIds []string
        for _, device := range allocated {
            deviceIds = append(deviceIds, device.UUID)
        }

        resp := &pluginapi.ContainerPreferredAllocationResponse{
            DeviceIDs: deviceIds,
        }

        response.ContainerResponses = append(response.ContainerResponses, resp)
    }
    return response, nil
}
  1. Allocate 执行实际的分配行为,要求 Device Plugin 根据分配请求,提供需要注入的环境变量、 Annotation 和挂载行为,这些均仅 CRI 可见,不会体现在 Pod Spec 上。
// - Allocate is expected to be called during pod creation since allocation
//   failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
//   environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
//   the Devices requested
type AllocateRequest struct {
    ContainerRequests    []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests,proto3" json:"container_requests,omitempty"`
    XXX_NoUnkeyedLiteral struct{}                    `json:"-"`
    XXX_sizecache        int32                       `json:"-"`
}

type ContainerAllocateRequest struct {
    DevicesIDs           []string `protobuf:"bytes,1,rep,name=devicesIDs,proto3" json:"devicesIDs,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
type AllocateResponse struct {
    ContainerResponses   []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses,proto3" json:"container_responses,omitempty"`
    XXX_NoUnkeyedLiteral struct{}                     `json:"-"`
    XXX_sizecache        int32                        `json:"-"`
}

type ContainerAllocateResponse struct {
    // List of environment variable to be set in the container to access one of more devices.
    Envs map[string]string `protobuf:"bytes,1,rep,name=envs,proto3" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
    // Mounts for the container.
    Mounts []*Mount `protobuf:"bytes,2,rep,name=mounts,proto3" json:"mounts,omitempty"`
    // Devices for the container.
    Devices []*DeviceSpec `protobuf:"bytes,3,rep,name=devices,proto3" json:"devices,omitempty"`
    // Container annotations to pass to the container runtime
    Annotations          map[string]string `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
    XXX_NoUnkeyedLiteral struct{}          `json:"-"`
    XXX_sizecache        int32             `json:"-"`
}

// Allocate which return list of devices.
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    responses := pluginapi.AllocateResponse{}
    for _, req := range reqs.ContainerRequests {
        for _, id := range req.DevicesIDs {
            if !m.deviceExists(id) {
                return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", m.resourceName, id)
            }
        }

        response := pluginapi.ContainerAllocateResponse{}

        uuids := req.DevicesIDs
        deviceIDs := m.deviceIDsFromUUIDs(uuids)

        if deviceListStrategyFlag == DeviceListStrategyEnvvar {
            response.Envs = m.apiEnvs(m.deviceListEnvvar, deviceIDs)
        }
        if deviceListStrategyFlag == DeviceListStrategyVolumeMounts {
            response.Envs = m.apiEnvs(m.deviceListEnvvar, []string{deviceListAsVolumeMountsContainerPathRoot})
            response.Mounts = m.apiMounts(deviceIDs)
        }
        if passDeviceSpecsFlag {
            response.Devices = m.apiDeviceSpecs(nvidiaDriverRootFlag, uuids)
        }

        responses.ContainerResponses = append(responses.ContainerResponses, &response)
    }

    return &responses, nil
}

Device Plugin 所要做的就是构造 ContainerAllocateResponse 响应体,返回 Annotation,Env,Mounts。实际上,对于 NVIDIA Runtime,只需要在容器中配置 NVIDIA_VISIBLE_DEVICES 这个环境变量,在 CRI 中会自动根据该项去挂载对应的 GPU 设备以及相关的 toolkit,如果配置了 NVIDIA_VISIBLE_DEVICES=all,则挂载全部设备。不需要返回 Annotation 和 Mount 信息。
5. 执行启动前逻辑。如果 Device Plugin 在注册时或者 GetDevicePluginOptions 中声明了要求执行启动前逻辑,则 Kubelet 会在 CRI 配置好容器后,在启动前调用 PreStartContainer 逻辑,此时 Pod 依然处于 Pending 状态,直到调用完毕。

Device Plugin 的局限性

  • Device Plugin 只能根据分配的结果去做相应的操作,或者在单机层面做一些单机层面的优选行为,却不能介入节点的筛选过程。
  • 设备的分配是依据 Index 来决定的,以单个设备为粒度进行分配,不支持按份数拆分。因此, Aliyun 的 GPU 共享插件为了支持按显存分配,将每一 G 显存都设置成了一个单独的设备。
  • 所有的 RPC 接口都不提供 Pod 信息,无法感知每一块设备具体是被哪个 Pod 分走了。为此,Aliyun 的做法是同时侵入调度器,在调度时,为 Pod 打上 Annotation,然后在 Device Plugin 中,通过 Kubelet 获取当前节点上 Pending 的 Pod,根据 Annotation 的内容做对应。字节跳动内部为了解决这个问题对 Kubelet 本身做了 Hack,在调用传递的 Context 中,封入了 Pod 的 Name 和 Namespace 信息。该特性有社区在提交 PR,但是两年没合并。
  • Device Plugin 鼓励的工作模式是无状态的,希望开发者不要在 Device Plugin 中记录任何的状态信息。

本文链接:https://blog.magichc7.com/post/Kubernetes-Device-Plugin.html

-- EOF --

相关评论