1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" )
func main() { envKubeConfigPath := os.Getenv("KUBECONFIG") if envKubeConfigPath == "" { klog.Fatal("KUBECONFIG env must be set for kubeconfig file path") } leaseNamespace := os.Getenv("LEASE_NAMESPACE") if leaseNamespace == "" { klog.Fatal("LEASE_NAMESPACE must be set for leader election") } config, err := clientcmd.BuildConfigFromFlags("", envKubeConfigPath) if err != nil { klog.Fatal(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatal(err) } inClusterConfig, err := rest.InClusterConfig() if err != nil { klog.Fatal(err) } inClusterClient, err := kubernetes.NewForConfig(inClusterConfig) if err != nil { klog.Fatal(err) }
ctx, cancel := context.WithCancel(context.Background()) sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { <-sigc cancel() }()
id := uuid.New().String() klog.Infof("ResourceLock identity id: %s", id) lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: "k8s-events-exporter", Namespace: leaseNamespace, }, Client: inClusterClient.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, }
klog.Infof("LeaderElectionConfig are: LeaseDuration %v second, RenewDeadline %v second, RetryPeriod %v second.", leaseDuration, renewDeadline, retryPeriod) subCtx, subCancel := context.WithCancel(ctx) leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, ReleaseOnCancel: true, LeaseDuration: leaseDuration * time.Second, RenewDeadline: renewDeadline * time.Second, RetryPeriod: retryPeriod * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { run(subCtx, clientset) }, OnStoppedLeading: func() { klog.Warningf("Leader %s lost", id) subCancel() }, OnNewLeader: func(identity string) { if identity == id { klog.Infof("Acquired the lock %s", identity) } else { klog.Infof("Leader is %v for the moment", identity) } }, }, }) }
|