入门
我们将创建一个示例项目来展示其工作方式。该示例将:
- 调谐一个 Memcached CR——它代表一个在集群中部署/由集群管理的 Memcached 实例
- 使用 Memcached 镜像创建一个 Deployment
- 不允许实例数超过 CR 中定义的 size
- 更新 Memcached CR 的状态
创建项目
首先,为你的项目创建并进入一个目录。然后使用 kubebuilder 初始化:
mkdir $GOPATH/memcached-operator
cd $GOPATH/memcached-operator
kubebuilder init --domain=example.com
创建 Memcached API(CRD)
接下来,我们将创建负责在集群上部署并管理 Memcached 实例的 API。
kubebuilder create api --group cache --version v1alpha1 --kind Memcached
理解 API
该命令的主要目标是为 Memcached 这个 Kind 生成自定义资源(CR)与自定义资源定义(CRD)。它会创建 group 为 cache.example.com、version 为 v1alpha1 的 API,从而唯一标识 Memcached Kind 的新 CRD。借助 Kubebuilder,我们可以定义代表我们在平台上方案的 API 与对象。
虽然本示例中仅添加了一种资源的 Kind,但我们可以根据需要拥有任意数量的 Group 与 Kind。为便于理解,可以将 CRD 看作自定义对象的“定义”,而 CR 则是其“实例”。
定义我们的 API
定义规格(Spec)
现在,我们将定义集群中每个 Memcached 资源实例可以采用的值。在本示例中,我们允许通过以下方式配置实例数量:
type MemcachedSpec struct {
...
// +kubebuilder:validation:Minimum=0
// +required
Size *int32 `json:"size,omitempty"`
}
定义 Status
我们还希望跟踪为管理 Memcached CR 所进行操作的状态。这使我们能够像使用 Kubernetes API 中的任何资源那样,校验自定义资源对我们 API 的描述,并判断一切是否成功,或是否遇到错误。
// MemcachedStatus defines the observed state of Memcached
type MemcachedStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
}
标记(Markers)与校验
此外,我们希望对自定义资源中的值进行校验以确保其有效。为此,我们将使用标记,例如 +kubebuilder:validation:Minimum=1。
现在,来看我们完整的示例。
Apache License
Copyright 2025 The Kubernetes authors.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Imports
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// MemcachedSpec defines the desired state of Memcached
type MemcachedSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// The following markers will use OpenAPI v3 schema to validate the value
// More info: https://book.kubebuilder.io/reference/markers/crd-validation.html
// size defines the number of Memcached instances
// The following markers will use OpenAPI v3 schema to validate the value
// More info: https://book.kubebuilder.io/reference/markers/crd-validation.html
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=3
// +kubebuilder:validation:ExclusiveMaximum=false
// +optional
Size *int32 `json:"size,omitempty"`
}
// MemcachedStatus defines the observed state of Memcached.
type MemcachedStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
// For Kubernetes API conventions, see:
// https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#typical-status-properties
// conditions represent the current state of the Memcached resource.
// Each condition has a unique type and reflects the status of a specific aspect of the resource.
//
// Standard condition types include:
// - "Available": the resource is fully functional
// - "Progressing": the resource is being created or updated
// - "Degraded": the resource failed to reach or maintain its desired state
//
// The status of each condition is one of True, False, or Unknown.
// +listType=map
// +listMapKey=type
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// Memcached is the Schema for the memcacheds API
type Memcached struct {
metav1.TypeMeta `json:",inline"`
// metadata is a standard object metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty,omitzero"`
// spec defines the desired state of Memcached
// +required
Spec MemcachedSpec `json:"spec"`
// status defines the observed state of Memcached
// +optional
Status MemcachedStatus `json:"status,omitempty,omitzero"`
}
// +kubebuilder:object:root=true
// MemcachedList contains a list of Memcached
type MemcachedList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Memcached `json:"items"`
}
func init() {
SchemeBuilder.Register(&Memcached{}, &MemcachedList{})
}
生成包含规格与校验的清单
生成所有必需文件:
-
运行
make generate在api/v1alpha1/zz_generated.deepcopy.go中生成 DeepCopy 实现。 -
然后运行
make manifests在config/crd/bases下生成 CRD 清单,并在config/samples下生成其示例。
这两个命令都会使用 controller-gen,但分别使用不同的参数来生成代码与清单。
config/crd/bases/cache.example.com_memcacheds.yaml: Our Memcached CRD
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.19.0
name: memcacheds.cache.example.com
spec:
group: cache.example.com
names:
kind: Memcached
listKind: MemcachedList
plural: memcacheds
singular: memcached
scope: Namespaced
versions:
- name: v1alpha1
schema:
openAPIV3Schema:
description: Memcached is the Schema for the memcacheds API
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: spec defines the desired state of Memcached
properties:
size:
description: |-
size defines the number of Memcached instances
The following markers will use OpenAPI v3 schema to validate the value
More info: https://book.kubebuilder.io/reference/markers/crd-validation.html
format: int32
maximum: 3
minimum: 1
type: integer
type: object
status:
description: status defines the observed state of Memcached
properties:
conditions:
description: |-
conditions represent the current state of the Memcached resource.
Each condition has a unique type and reflects the status of a specific aspect of the resource.
Standard condition types include:
- "Available": the resource is fully functional
- "Progressing": the resource is being created or updated
- "Degraded": the resource failed to reach or maintain its desired state
The status of each condition is one of True, False, or Unknown.
items:
description: Condition contains details for one aspect of the current
state of this API Resource.
properties:
lastTransitionTime:
description: |-
lastTransitionTime is the last time the condition transitioned from one status to another.
This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: |-
message is a human readable message indicating details about the transition.
This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: |-
observedGeneration represents the .metadata.generation that the condition was set based upon.
For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date
with respect to the current state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: |-
reason contains a programmatic identifier indicating the reason for the condition's last transition.
Producers of specific condition types may define expected values and meanings for this field,
and whether the values are considered a guaranteed API.
The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
x-kubernetes-list-map-keys:
- type
x-kubernetes-list-type: map
type: object
required:
- spec
type: object
served: true
storage: true
subresources:
status: {}
自定义资源示例
config/samples 目录下的清单是可应用到集群的自定义资源示例。在本例中,将该资源应用到集群会生成一个副本数为 1 的 Deployment(见 size: 1)。
apiVersion: cache.example.com/v1alpha1
kind: Memcached
metadata:
labels:
app.kubernetes.io/name: project
app.kubernetes.io/managed-by: kustomize
name: memcached-sample
spec:
# TODO(user): edit the following value to ensure the number
# of Pods/Instances your Operand must have on cluster
size: 1
调谐(Reconcile)流程
简单来说,Kubernetes 允许我们声明系统的期望状态,然后其控制器会持续观察集群并采取操作,以确保实际状态与期望状态一致。对于我们的自定义 API 与控制器,过程也是类似的。记住:我们是在扩展 Kubernetes 的行为与 API 以满足特定需求。
在控制器中,我们将实现一个调谐流程。
本质上,调谐流程以循环方式工作:持续检查条件并执行必要操作,直到达到期望状态。该流程会一直运行,直到系统中的所有条件与我们的实现所定义的期望状态一致。
下面是一个伪代码示例:
reconcile App {
// Check if a Deployment for the app exists, if not, create one
// If there's an error, then restart from the beginning of the reconcile
if err != nil {
return reconcile.Result{}, err
}
// Check if a Service for the app exists, if not, create one
// If there's an error, then restart from the beginning of the reconcile
if err != nil {
return reconcile.Result{}, err
}
// Look for Database CR/CRD
// Check the Database Deployment's replicas size
// If deployment.replicas size doesn't match cr.size, then update it
// Then, restart from the beginning of the reconcile. For example, by returning `reconcile.Result{Requeue: true}, nil`.
if err != nil {
return reconcile.Result{Requeue: true}, nil
}
...
// If at the end of the loop:
// Everything was executed successfully, and the reconcile can stop
return reconcile.Result{}, nil
}
放到本示例的上下文中
当我们将示例自定义资源(CR)应用到集群(例如 kubectl apply -f config/sample/cache_v1alpha1_memcached.yaml)时,我们希望确保会为 Memcached 镜像创建一个 Deployment,且其副本数与 CR 中定义的一致。
为实现这一点,我们首先需要实现一个操作:检查集群中是否已存在该 Memcached 实例对应的 Deployment;如果不存在,控制器将据此创建 Deployment。因此,调谐流程必须包含一项操作来确保该期望状态被持续维持。该操作大致包括:
// Check if the deployment already exists, if not create a new one
found := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
if err != nil && apierrors.IsNotFound(err) {
// Define a new deployment
dep := r.deploymentForMemcached()
// Create the Deployment on the cluster
if err = r.Create(ctx, dep); err != nil {
log.Error(err, "Failed to create new Deployment",
"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
return ctrl.Result{}, err
}
...
}
接着需要注意,deploymentForMemcached() 函数需要定义并返回应在集群上创建的 Deployment。该函数应构造具备必要规格的 Deployment 对象,如下例所示:
dep := &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "memcached:1.6.26-alpine3.19",
Name: "memcached",
ImagePullPolicy: corev1.PullIfNotPresent,
Ports: []corev1.ContainerPort{{
ContainerPort: 11211,
Name: "memcached",
}},
Command: []string{"memcached", "--memory-limit=64", "-o", "modern", "-v"},
}},
},
},
},
}
此外,我们需要实现一个机制,以校验集群中的 Memcached 副本数是否与 CR 中指定的期望值一致。如果不一致,调谐过程必须更新集群以确保一致性。这意味着:无论何时在集群上创建或更新 Memcached Kind 的 CR,控制器都会持续调谐,直到实际副本数与期望值一致。如下例所示:
...
size := memcached.Spec.Size
if *found.Spec.Replicas != size {
found.Spec.Replicas = &size
if err = r.Update(ctx, found); err != nil {
log.Error(err, "Failed to update Deployment",
"Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
return ctrl.Result{}, err
}
...
现在,你可以查看负责管理 Memcached Kind 自定义资源的完整控制器。该控制器确保集群中的期望状态得以维持,从而保证 Memcached 实例始终以用户指定的副本数运行。
internal/controller/memcached_controller.go: Our Controller Implementation
/*
Copyright 2025 The Kubernetes authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
cachev1alpha1 "example.com/memcached/api/v1alpha1"
)
// Definitions to manage status conditions
const (
// typeAvailableMemcached represents the status of the Deployment reconciliation
typeAvailableMemcached = "Available"
)
// MemcachedReconciler reconciles a Memcached object
type MemcachedReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// It is essential for the controller's reconciliation loop to be idempotent. By following the Operator
// pattern you will create Controllers which provide a reconcile function
// responsible for synchronizing resources until the desired state is reached on the cluster.
// Breaking this recommendation goes against the design principles of controller-runtime.
// and may lead to unforeseen consequences such as resources becoming stuck and requiring manual intervention.
// For further info:
// - About Operator Pattern: https://kubernetes.io/docs/concepts/extend-kubernetes/operator/
// - About Controllers: https://kubernetes.io/docs/concepts/architecture/controller/
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/reconcile
func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx)
// Fetch the Memcached instance
// The purpose is check if the Custom Resource for the Kind Memcached
// is applied on the cluster if not we return nil to stop the reconciliation
memcached := &cachev1alpha1.Memcached{}
err := r.Get(ctx, req.NamespacedName, memcached)
if err != nil {
if apierrors.IsNotFound(err) {
// If the custom resource is not found then it usually means that it was deleted or not created
// In this way, we will stop the reconciliation
log.Info("memcached resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
log.Error(err, "Failed to get memcached")
return ctrl.Result{}, err
}
// Let's just set the status as Unknown when no status is available
if len(memcached.Status.Conditions) == 0 {
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached, Status: metav1.ConditionUnknown, Reason: "Reconciling", Message: "Starting reconciliation"})
if err = r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
// Let's re-fetch the memcached Custom Resource after updating the status
// so that we have the latest state of the resource on the cluster and we will avoid
// raising the error "the object has been modified, please apply
// your changes to the latest version and try again" which would re-trigger the reconciliation
// if we try to update it again in the following operations
if err := r.Get(ctx, req.NamespacedName, memcached); err != nil {
log.Error(err, "Failed to re-fetch memcached")
return ctrl.Result{}, err
}
}
// Check if the deployment already exists, if not create a new one
found := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: memcached.Name, Namespace: memcached.Namespace}, found)
if err != nil && apierrors.IsNotFound(err) {
// Define a new deployment
dep, err := r.deploymentForMemcached(memcached)
if err != nil {
log.Error(err, "Failed to define new Deployment resource for Memcached")
// The following implementation will update the status
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
Status: metav1.ConditionFalse, Reason: "Reconciling",
Message: fmt.Sprintf("Failed to create Deployment for the custom resource (%s): (%s)", memcached.Name, err)})
if err := r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
log.Info("Creating a new Deployment",
"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
if err = r.Create(ctx, dep); err != nil {
log.Error(err, "Failed to create new Deployment",
"Deployment.Namespace", dep.Namespace, "Deployment.Name", dep.Name)
return ctrl.Result{}, err
}
// Deployment created successfully
// We will requeue the reconciliation so that we can ensure the state
// and move forward for the next operations
return ctrl.Result{RequeueAfter: time.Minute}, nil
} else if err != nil {
log.Error(err, "Failed to get Deployment")
// Let's return the error for the reconciliation be re-trigged again
return ctrl.Result{}, err
}
// If the size is not defined in the Custom Resource then we will set the desired replicas to 0
var desiredReplicas int32 = 0
if memcached.Spec.Size != nil {
desiredReplicas = *memcached.Spec.Size
}
// The CRD API defines that the Memcached type have a MemcachedSpec.Size field
// to set the quantity of Deployment instances to the desired state on the cluster.
// Therefore, the following code will ensure the Deployment size is the same as defined
// via the Size spec of the Custom Resource which we are reconciling.
if found.Spec.Replicas == nil || *found.Spec.Replicas != desiredReplicas {
found.Spec.Replicas = ptr.To(desiredReplicas)
if err = r.Update(ctx, found); err != nil {
log.Error(err, "Failed to update Deployment",
"Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
// Re-fetch the memcached Custom Resource before updating the status
// so that we have the latest state of the resource on the cluster and we will avoid
// raising the error "the object has been modified, please apply
// your changes to the latest version and try again" which would re-trigger the reconciliation
if err := r.Get(ctx, req.NamespacedName, memcached); err != nil {
log.Error(err, "Failed to re-fetch memcached")
return ctrl.Result{}, err
}
// The following implementation will update the status
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
Status: metav1.ConditionFalse, Reason: "Resizing",
Message: fmt.Sprintf("Failed to update the size for the custom resource (%s): (%s)", memcached.Name, err)})
if err := r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
// Now, that we update the size we want to requeue the reconciliation
// so that we can ensure that we have the latest state of the resource before
// update. Also, it will help ensure the desired state on the cluster
return ctrl.Result{Requeue: true}, nil
}
// The following implementation will update the status
meta.SetStatusCondition(&memcached.Status.Conditions, metav1.Condition{Type: typeAvailableMemcached,
Status: metav1.ConditionTrue, Reason: "Reconciling",
Message: fmt.Sprintf("Deployment for custom resource (%s) with %d replicas created successfully", memcached.Name, desiredReplicas)})
if err := r.Status().Update(ctx, memcached); err != nil {
log.Error(err, "Failed to update Memcached status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&cachev1alpha1.Memcached{}).
Owns(&appsv1.Deployment{}).
Named("memcached").
Complete(r)
}
// deploymentForMemcached returns a Memcached Deployment object
func (r *MemcachedReconciler) deploymentForMemcached(
memcached *cachev1alpha1.Memcached) (*appsv1.Deployment, error) {
image := "memcached:1.6.26-alpine3.19"
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: memcached.Name,
Namespace: memcached.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: memcached.Spec.Size,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app.kubernetes.io/name": "project"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app.kubernetes.io/name": "project"},
},
Spec: corev1.PodSpec{
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: ptr.To(true),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
},
Containers: []corev1.Container{{
Image: image,
Name: "memcached",
ImagePullPolicy: corev1.PullIfNotPresent,
// Ensure restrictive context for the container
// More info: https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
SecurityContext: &corev1.SecurityContext{
RunAsNonRoot: ptr.To(true),
RunAsUser: ptr.To(int64(1001)),
AllowPrivilegeEscalation: ptr.To(false),
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{
"ALL",
},
},
},
Ports: []corev1.ContainerPort{{
ContainerPort: 11211,
Name: "memcached",
}},
Command: []string{"memcached", "--memory-limit=64", "-o", "modern", "-v"},
}},
},
},
},
}
// Set the ownerRef for the Deployment
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/
if err := ctrl.SetControllerReference(memcached, dep, r.Scheme); err != nil {
return nil, err
}
return dep, nil
}
深入控制器实现
配置 Manager 监听资源
核心思想是监听对控制器重要的资源。当控制器关注的资源发生变化时,Watch 会触发控制器的调谐循环,以确保资源的实际状态与控制器逻辑定义的期望状态相匹配。
注意我们如何配置 Manager 来监控 Memcached Kind 的自定义资源(CR)的创建、更新或删除等事件,以及控制器所管理并拥有的 Deployment 的任何变化:
// SetupWithManager sets up the controller with the Manager.
// The Deployment is also watched to ensure its
// desired state in the cluster.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Watch the Memcached Custom Resource and trigger reconciliation whenever it
//is created, updated, or deleted
For(&cachev1alpha1.Memcached{}).
// Watch the Deployment managed by the Memcached controller. If any changes occur to the Deployment
// owned and managed by this controller, it will trigger reconciliation, ensuring that the cluster
// state aligns with the desired state.
Owns(&appsv1.Deployment{}).
Complete(r)
}
但是,Manager 如何知道哪些资源归它所有?
我们并不希望控制器去监听集群中的所有 Deployment 并触发调谐循环;我们只希望在运行 Memcached 实例的那个特定 Deployment 发生变化时才触发。例如,如果有人误删了我们的 Deployment 或修改了其副本数,我们希望触发调谐以使其回到期望状态。
Manager 之所以知道应该观察哪个 Deployment,是因为我们设置了 ownerRef(Owner Reference):
if err := ctrl.SetControllerReference(memcached, dep, r.Scheme); err != nil {
return nil, err
}
授予权限
确保控制器拥有管理其资源所需的权限(例如创建、获取、更新、列出)非常重要。
RBAC 权限 现在通过 RBAC 标记 配置,这些标记用于生成并更新 config/rbac/ 中的清单文件。它们可以(且应当)定义在每个控制器的 Reconcile() 方法上,如下示例所示:
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=cache.example.com,resources=memcacheds/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
修改控制器后,运行 make manifests 命令。这将促使 controller-gen 刷新 config/rbac 下的文件。
config/rbac/role.yaml: Our RBAC Role generated
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- cache.example.com
resources:
- memcacheds
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- cache.example.com
resources:
- memcacheds/finalizers
verbs:
- update
- apiGroups:
- cache.example.com
resources:
- memcacheds/status
verbs:
- get
- patch
- update
Manager(main.go)
cmd/main.go 中的 Manager 负责管理应用中的各个控制器。
cmd/main.go: Our main.go
/*
Copyright 2025 The Kubernetes authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"crypto/tls"
"flag"
"os"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
cachev1alpha1 "example.com/memcached/api/v1alpha1"
"example.com/memcached/internal/controller"
// +kubebuilder:scaffold:imports
)
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(cachev1alpha1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}
// nolint:gocyclo
func main() {
var metricsAddr string
var metricsCertPath, metricsCertName, metricsCertKey string
var webhookCertPath, webhookCertName, webhookCertKey string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var tlsOpts []func(*tls.Config)
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", true,
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
flag.StringVar(&metricsCertPath, "metrics-cert-path", "",
"The directory that contains the metrics server certificate.")
flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
opts := zap.Options{
Development: true,
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
// Rapid Reset CVEs. For more information see:
// - https://github.com/advisories/GHSA-qppj-fm5r-hxr3
// - https://github.com/advisories/GHSA-4374-p667-p6c8
disableHTTP2 := func(c *tls.Config) {
setupLog.Info("disabling http/2")
c.NextProtos = []string{"http/1.1"}
}
if !enableHTTP2 {
tlsOpts = append(tlsOpts, disableHTTP2)
}
// Initial webhook TLS options
webhookTLSOpts := tlsOpts
webhookServerOptions := webhook.Options{
TLSOpts: webhookTLSOpts,
}
if len(webhookCertPath) > 0 {
setupLog.Info("Initializing webhook certificate watcher using provided certificates",
"webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey)
webhookServerOptions.CertDir = webhookCertPath
webhookServerOptions.CertName = webhookCertName
webhookServerOptions.KeyName = webhookCertKey
}
webhookServer := webhook.NewServer(webhookServerOptions)
// Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server.
// More info:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/metrics/server
// - https://book.kubebuilder.io/reference/metrics.html
metricsServerOptions := metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
TLSOpts: tlsOpts,
}
if secureMetrics {
// FilterProvider is used to protect the metrics endpoint with authn/authz.
// These configurations ensure that only authorized users and service accounts
// can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info:
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/metrics/filters#WithAuthenticationAndAuthorization
metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization
}
// If the certificate is not specified, controller-runtime will automatically
// generate self-signed certificates for the metrics server. While convenient for development and testing,
// this setup is not recommended for production.
//
// TODO(user): If you enable certManager, uncomment the following lines:
// - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates
// managed by cert-manager for the metrics server.
// - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification.
if len(metricsCertPath) > 0 {
setupLog.Info("Initializing metrics certificate watcher using provided certificates",
"metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey)
metricsServerOptions.CertDir = metricsCertPath
metricsServerOptions.CertName = metricsCertName
metricsServerOptions.KeyName = metricsCertKey
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "4b13cc52.example.com",
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
// speeds up voluntary leader transitions as the new leader don't have to wait
// LeaseDuration time first.
//
// In the default scaffold provided, the program ends immediately after
// the manager stops, so would be fine to enable this option. However,
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
if err := (&controller.MemcachedReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Memcached")
os.Exit(1)
}
// +kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
使用 Kubebuilder 插件生成额外选项
现在你已经更好地理解了如何创建自己的 API 与控制器,让我们在该项目中引入 autoupdate.kubebuilder.io/v1-alpha 插件,以便你的项目能跟随最新的 Kubebuilder 版本脚手架变化保持更新,并由此采纳生态中的改进。
kubebuilder edit --plugins="autoupdate/v1-alpha"
查看 .github/workflows/auto-update.yml 文件了解其工作方式。
在集群中验证项目
此时你可以参考快速开始中定义的步骤在集群中验证该项目,见:Run It On the Cluster
下一步
- 若想更深入地开发你的方案,考虑阅读 CronJob 教程
- 有关优化方法的思路,请参考最佳实践
