Blog

Back to resource

Karpenter Node Holder

26 Nov 2023
Articles
Author: Hennadii Stas, DevOps engineer at SHALB
Views: 1294

Note: this article deals with Karpenter concepts prior to v0.32.0 version. The main issue discussed here is still being worked on in Karpenter GitHub so the articles’ idea can still be applied to v0.32.0, however parameter and annotation names will be different.

Karpenter is a lightweight and efficient node autoscaler for Kubernetes. We opted to test it on our EKS clusters due to its excellent adaptability to dynamic environments and its smart approach to node provisioning. However, shortly after deploying it to production we encountered a challenge known as ‘node churn’—a common issue stemming from Karpenter’s operational specifics. In this article, I will detail how we successfully addressed this problem by creating our custom solution: the karpenter-node-holder controller.

What is Karpenter

Karpenter is a scalable and extensible node autoscaler for Kubernetes. It automatically provisions the right quantity and type of compute resources based on pod requirements, leading to more efficient cluster resource utilization and cost savings.

Karpenter offers several distinct benefits:

  • Flexibility and Extensibility: it is designed to be cloud provider-agnostic, allowing it to work across multiple cloud platforms and integrate with a diverse set of provisioning tools.
  • Simplified Configuration: With its streamlined configuration settings, Karpenter is easier to set up and manage, requiring fewer adjustments to achieve optimal scaling behaviors.
  • Responsive Scaling: Karpenter reacts quickly to changes in workload requirements, provisioning nodes in seconds as opposed to minutes, ensuring rapid adaptability in dynamic environments.
  • Efficient Resource Utilization: Instead of merely scaling based on unschedulable pods, Karpenter takes into account individual pod specifications, such as CPU, memory, and GPU requirements. This leads to more accurate node provisioning and reduces resource wastage.

The latter two points were the main things that interested us in trying out Karpenter on our EKS clusters – event-driven reactions instead of periodic checks, such as new pod creations, ensuring an immediate response to scaling needs. Also, it doesn’t just provision any node; it finds a most appropriate and cost-effective one from the configured list. This promised an optimal fit for workloads, leading to more efficient resource use. However, soon after the deployment on a production cluster this very approach introduced a slew of new challenges. Specifically, Karpenter’s fine-tuned provisioning can result in frequent node changes, leading to significant node churn and possible operational complexities.

Node Churn in Karpenter: A Closer Look

Karpenter’s dynamic approach to node scaling is undoubtedly efficient, but it can also lead to a phenomenon known as ‘node churn’. Imagine a cluster where, depending on the time of day, teams deploy jobs that demand the full capacity of several commonly-used node types. Karpenter is swift in provisioning these nodes in response. However, once the jobs, which typically last between 10-30 minutes, conclude, Karpenter is equally quick to decommission them.

A few jobs might need extended processing times, spanning a couple of hours. This situation creates a continuous ebb and flow of nodes, where new nodes are launched to accommodate jobs and old ones are removed post-completion. This relentless cycle of provisioning and deprovisioning can pose operational challenges.

Compounding this, Karpenter also factors in node costs, which can fluctuate, in its quest to always select the most cost-effective option. Thus, the combination of workload variability and dynamic cost considerations can lead Karpenter to perpetually reconfigure the cluster, resulting in considerable node churn.

Currently, automatic node deprovisioning behavior based on Node utilization is controlled by two options in a Provisioner: either

spec:
  consolidation:
    enabled: true

and

spec: 
  ttlSecondsAfterEmpty: 30

ttlSecondsAfterEmpty=X will wait X seconds after a Node is completely empty (non-counting DaemonSets) before decomissioning it.

consolidation.enabled=true will make decisions to deprovision non-empty nodes to consolidate their workload on already existing or newly spun up nodes. This is how we run Karpenter in our clusters.

Node Timeouts: A Potential Solution

The issue was described in Karpenter’s GitHub some time ago and a proposed approach is to introduce consolidation timeouts to have an internal consolidation controller in Karpenter that wait some time before making consolidation actions. However, as there’s no clear timeline for the feature to be implemented, we decided to take a look at a similar approach with the tools we already had.

We thought of introducing node timeouts, where each newly provisioned node “safeguards” the whole cluster from immediate deprovisioning decisions for a predetermined duration, say “X” minutes.

Here’s the gist of the idea: Once Karpenter spins up a new node, the cluster enters a “protected” state for the duration of the timeout. During this time, regardless of workload changes or job completions, all nodes remain unaffected by downscaling decisions.

This protection, if working properly, should provide better stability and predictability, while greatly reducing node churn.

A feature that allows this implementation is Node annotations; Karpenter won’t touch nodes that have karpenter.sh/do-notconsolidate (it was changed in v0.32 to karpenter.sh/do-not-disrupt) annotation on them. Annotations can be added and removed dynamically to nodes at any point.

So what we decided to do was write a controller that watches node additions to the cluster and adds the annotation temporarily for X minutes before removing them, effectively pausing the deprovisioning.

Implementation

To automatically watch and react to node spin ups, we decided to write a controller in Golang that would be deployed on clusters using a Helm Chart. We called it karpenter-node-holder because it ‘holds’ nodes for a set amount of time.

Helm Chart

Let’s first describe Kubernetes resources that we need to deploy:

  1. Controller Deployment: a deployment that will run our program.
  2. ServiceAccount: for the deployment to get authorized for node event listening and node modifications within the cluster.
  3. ClusterRole: a set of permissions.
  4. ClusterRoleBinding: to assign the permissions to the ServiceAccount.

Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "karpenter-node-holder.name" . }}-watcher
spec:
  replicas: 1
  selector:
    matchLabels:
      app: {{ include "karpenter-node-holder.name" . }}-watcher
  template:
    metadata:
      labels:
        app: {{ include "karpenter-node-holder.name" . }}-watcher
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: karpenter.sh/provisioner-name
                operator: DoesNotExist
      nodeSelector:
        kubernetes.io/arch: arm64
      serviceAccountName: {{ .Release.Name }}-sa
      terminationGracePeriodSeconds: 120
      containers:
      - name: main
        image: {{ .Values.image }}
        imagePullPolicy: Always
        env:
        - name: HOLD_DURATION
          value: "{{ .Values.holdDuration }}"
        - name: HOLD_ANNOTATION
          value: "{{ .Values.holdAnnotation }}"
        - name: INITIAL_DELAY
          value: "{{ .Values.initialDelay }}"

ServiceAccount

apiVersion: v1
kind: ServiceAccount
metadata:
  name: {{ .Release.Name }}-sa

ClusterRole

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: {{ .Release.Name }}-role
rules:
- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["get", "list", "patch", "update", "edit", "watch"]

ClusterRoleBinding

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: {{ .Release.Name }}-rolebinding
subjects:
- kind: ServiceAccount
  name: {{ .Release.Name }}-sa
  namespace: {{ .Release.Namespace }}
roleRef:
  kind: ClusterRole
  name: {{ .Release.Name }}-role
  apiGroup: rbac.authorization.k8s.io

values.yaml

image: <img_tag>
​
initialDelay: 60 # Time in seconds to wait before applying the annotation
holdDuration: 20 # Time in minutes to hold the annotation on nodes
holdAnnotation: "karpenter.sh/do-not-consolidate" # Change to "karpenter.sh/do-not-disrupt" in Karpenter v0.32.0

Code

Note: you can find code samples in the source repository.

First, we need to import the required packages:

package main
​
import (
        "context"
        "log"
        "os"
        "os/signal"
        "strconv"
        "sync"
        "syscall"
        "time"
​
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/watch"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
        "k8s.io/client-go/tools/cache"
)
  • Standard libraries: Libraries such as context, log, os, and others provide foundational tools for logging, handling OS interactions, and synchronization.
  • Kubernetes packages: They offer tools to interact with the Kubernetes cluster, observe changes in resources, and manage Kubernetes object metadata.

Then let’s define some constants:

const (
        // Initial delay before annotating nodes
        INITIAL_DELAY = 30 * time.Second
        // Maximum number of retries before giving up on annotating or removing an annotation from a node
        MAX_RETRIES = 5
        // Delay between retries
        RETRY_DELAY = 5 * time.Second
)
  • INITIAL_DELAY: A small initial delay to allow a node to transition into an editable state.
  • MAX_RETRIES: Defines a number of retries on every Kubernetes API call.
  • RETRY_DELAY: A delay between each retry.

main() function is a bit busy because of all the initializations, so let’s take a look at the functionally most important part:

      var mu sync.Mutex
      var timer *time.Timer
      var timerMutex sync.Mutex
​
      _, informer := cache.NewInformer(
              listWatcher,
              &corev1.Node{}, // Type of resource to watch
              time.Duration(0), // No resync
              cache.ResourceEventHandlerFuncs{
                      AddFunc: func(obj interface{}) {
                              node := obj.(*corev1.Node)
                              // Skip nodes that were already running when we started
                              if !recordedUIDs[string(node.UID)] {
                                      logger.Printf("Node added: %s\n", node.Name)
​
                                      time.Sleep(getInitialDelay())
                                      // Stop the timer and then add annotation and start a new timer
                                      pauseConsolidation(clientset, &mu, &timer, &timerMutex, logger)
                              }
                      },
                      DeleteFunc: func(obj interface{}) {
                              node := obj.(*corev1.Node)
                              logger.Printf("Node deleted: %s\n", node.Name)
                      },
              },
      )
  • Mutexes: The mu and timerMutex are synchronization primitives to prevent node modifications and a timer manipulation by multiple goroutines that may start simultaneously.
  • NewInformer: The primary mechanism for our controller to monitor events. The informer watches for changes to Node resources within the Kubernetes cluster. Specifically:
    • AddFunc: Triggered when a new node is added to the cluster. If the node wasn’t running when the controller started (checked using recordedUIDs ), it logs the addition, waits for a specified delay (getInitialDelay()), and then pauses the timer to add annotations.
    • DeleteFunc: Invoked when a node is removed from the cluster. The function simply logs the node’s deletion.
      logger.Println("Created informer")
      // Start the informer to begin watching for changes
      stopCh := make(chan struct{})
      defer close(stopCh)
​
      go informer.Run(stopCh)
      logger.Println("Started informer")
  • stopCh: A channel that provides a mechanism to safely stop the informer when needed, ensuring a clean shutdown.

Alright, let’s dive into the pauseConsolidation function:

// Pause consolidation by adding an annotation to all nodes and starting a timer
// to remove the annotation after holdDuration minutes
//
// Parameters:
//
//      clientset: Kubernetes clientset
//      mu: Mutex to lock before updating the timer
//      timer: Pointer to the timer
//      timerMutex: Mutex to lock before updating the timer
//      logger: Logger
func pauseConsolidation(clientset *kubernetes.Clientset, mu *sync.Mutex, timer **time.Timer, timerMutex *sync.Mutex, logger *log.Logger) {
        holdAnnotation := os.Getenv("HOLD_ANNOTATION")
        mu.Lock()
        defer mu.Unlock()

        timerMutex.Lock()
        defer timerMutex.Unlock()

        if *timer != nil && (*timer).Stop() {
                // Drain the timer's channel if it's still active
                select {
                case <-(*timer).C:
                default:
                }
                logger.Printf("Stopping previous timer\n")
        }
        // Annotate all nodes with "karpenter.sh/do-not-consolidate=true"
        logger.Printf("Adding annotation %s to all nodes\n", holdAnnotation)
        annotateNodes(clientset, listNodes(clientset, logger), holdAnnotation, logger)

        // Start the timer to remove the annotation after holdDuration minutes
        logger.Printf("Starting timer to remove annotation in %v\n", getHoldDuration(logger))
        *timer = time.AfterFunc(getHoldDuration(logger), func() {
                mu.Lock()
                defer mu.Unlock()

                logger.Println("Removing annotation from all nodes")

                // Remove the annotation from all nodes
                removeAnnotationFromNodes(clientset, listNodes(clientset, logger), holdAnnotation, logger)
        })
}

We start by getting annotation from env that we pass in the Deployment spec. Then we lock our mutexes and drain the timer if it’s already active. The drain needed if a new Node was added to a cluster, we need to extend the timer instead of continuing the existing one.

We annotate nodes (see below) and start delayed annotation removal with time.AfterFunc()

// Annotate all nodes with the given annotation
//
// Parameters:
//
//      clientset: Kubernetes clientset
//      nodeList: List of nodes
//      holdAnnotation: Annotation to add
//      logger: Logger
func annotateNodes(clientset *kubernetes.Clientset, nodeList *corev1.NodeList, holdAnnotation string, logger *log.Logger) {
        for _, node := range nodeList.Items {
                retryCount := 0
                for {
                        if node.Annotations == nil {
                                node.Annotations = make(map[string]string)
                        }
                        if node.Annotations[holdAnnotation] == "true" {
                                logger.Printf("Node %s already annotated, skipping\n", node.Name)
                                break
                        }
                        node.Annotations[holdAnnotation] = "true"
                        _, err := clientset.CoreV1().Nodes().Update(context.TODO(), &node, metav1.UpdateOptions{})
                        if err == nil {
                                logger.Printf("Successfully annotated node %s\n", node.Name)
                                break
                        } else {
                                logger.Printf("Error annotating node %s: %v\n", node.Name, err)
                                if retryCount >= MAX_RETRIES {
                                        logger.Printf("Max retries reached, giving up on node %s\n", node.Name)
                                        break
                                }

                                time.Sleep(RETRY_DELAY)
                                retryCount++

                                updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
                                if err != nil {
                                        logger.Printf("Error updating node info %s: %v\n", node.Name, err)
                                        break
                                }
                                node = *updatedNode
                        }
                }
        }
}

Although this function looks a bit complex because of all of these conditions, what it does is really simple: if a node doesn’t have a required annotation, we add it, and if we encounter any errors, we attempt a number of retries before giving up.

The function gets a list of nodes that we can directly pass to clientset.CoreV1().Nodes().Update() after we make changes to them. If we encounter any errors, we request an update for a problematic node to update its state with clientset.CoreV1().Nodes().Get()

 // Remove the annotation from all nodes
//
// Parameters:
//
//      clientset: Kubernetes clientset
//      nodeList: List of nodes
//      holdAnnotation: Annotation to remove
//      logger: Logger
func removeAnnotationFromNodes(clientset *kubernetes.Clientset, nodeList *corev1.NodeList, holdAnnotation string, logger *log.Logger) {
        for _, node := range nodeList.Items {
                retryCount := 0
                for {
                        if node.Annotations == nil || node.Annotations[holdAnnotation] != "true" {
                                logger.Printf("Node %s doesn't have the annotation, skipping\n", node.Name)
                                break
                        }
                        delete(node.Annotations, holdAnnotation)
                        _, err := clientset.CoreV1().Nodes().Update(context.TODO(), &node, metav1.UpdateOptions{})
                        if err == nil {
                                logger.Printf("Successfully removed annotation from node %s\n", node.Name)
                                break
                        } else {
                                logger.Printf("Error removing annotation from node %s: %v\n", node.Name, err)
                                if retryCount >= MAX_RETRIES {
                                        logger.Printf("Max retries reached, giving up on node %s\n", node.Name)
                                        break
                                }

                                time.Sleep(RETRY_DELAY)
                                retryCount++

                                updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
                                if err != nil {
                                        logger.Printf("Error updating node info %s: %v\n", node.Name, err)
                                        break
                                }
                                node = *updatedNode
                        }
                }
        }
}

Really similar to the previous one, only for removal.

Other helper functions are self-explanatory and can be viewed in source.

Building a Docker image

FROM golang:1.21.1-alpine as builder

WORKDIR /app
COPY main.go go.mod go.sum .
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -o karpenter-node-holder main.go

FROM alpine:3.14.2

COPY --from=builder /app/karpenter-node-holder .

ENV HOLD_DURATION=15
ENV HOLD_ANNOTAION="karpenter.sh/do-not-consolidate"

CMD ["./karpenter-node-holder"]

We’re using a separate builder image to make the resulting image smaller. We copy source files, download the required packages, and build a binary.

In the second phase we use a lightweight Alpine image as a base. We copy binary and set env defaults just in case. We also specify the default command when the container starts.

Conclusion

After integrating this controller into our clusters last month, we’ve observed notable improvements. The incessant node churn we previously faced has significantly diminished. This has not only led to fewer restarts of our services, ensuring smoother operations, but also translated to some cost savings as the clusters consume less traffic by constantly pulling images on new nodes. 

Implementing this solution has certainly proved its worth for our Kubernetes environment and we’re optimistic about its potential benefits for others facing the issue (at least until Karpenter adds a similar feature themselves). Version 0.32.0 introduced a new format to configure consolidation (or disruption) that makes me think the feature to add delays to deprovisioning decisions is on the horizon.

Back to resource