sijie closed pull request #2856: Added ability for the kubernetes to poll a 
configmap to look out for changes
URL: https://github.com/apache/pulsar/pull/2856
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 1a180aee0c..c6d5d02645 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -24,13 +24,20 @@
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.apis.AppsV1Api;
 import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1ConfigMap;
 import io.kubernetes.client.util.Config;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 
+import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 
@@ -40,24 +47,33 @@
 @Slf4j
 public class KubernetesRuntimeFactory implements RuntimeFactory {
 
-    private final String k8Uri;
-    private final String jobNamespace;
-    private final String pulsarDockerImageName;
-    private final String pulsarRootDir;
+    @Getter
+    @Setter
+    @NoArgsConstructor
+    class KubernetesInfo {
+        private String k8Uri;
+        private String jobNamespace;
+        private String pulsarDockerImageName;
+        private String pulsarRootDir;
+        private String pulsarAdminUrl;
+        private String pulsarServiceUrl;
+        private String pythonDependencyRepository;
+        private String pythonExtraDependencyRepository;
+        private String changeConfigMap;
+        private String changeConfigMapNamespace;
+    }
+    private final KubernetesInfo kubernetesInfo;
     private final Boolean submittingInsidePod;
     private final Boolean installUserCodeDependencies;
-    private final String pythonDependencyRepository;
-    private final String pythonExtraDependencyRepository;
     private final Map<String, String> customLabels;
-    private final String pulsarAdminUri;
-    private final String pulsarServiceUri;
+    private final Integer expectedMetricsCollectionInterval;
     private final String stateStorageServiceUri;
     private final AuthenticationConfig authConfig;
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
     private final String prometheusMetricsServerJarFile;
     private final String logDirectory = "logs/functions";
-    private final Integer expectedMetricsInterval;
+    private Timer changeConfigMapTimer;
     private AppsV1Api appsClient;
     private CoreV1Api coreClient;
 
@@ -75,36 +91,41 @@ public KubernetesRuntimeFactory(String k8Uri,
                                     String pulsarAdminUri,
                                     String stateStorageServiceUri,
                                     AuthenticationConfig authConfig,
-                                    Integer expectedMetricsInterval) {
-        this.k8Uri = k8Uri;
+                                    Integer expectedMetricsCollectionInterval,
+                                    String changeConfigMap,
+                                    String changeConfigMapNamespace) {
+        this.kubernetesInfo = new KubernetesInfo();
+        this.kubernetesInfo.setK8Uri(k8Uri);
         if (!isEmpty(jobNamespace)) {
-            this.jobNamespace = jobNamespace;
+            this.kubernetesInfo.setJobNamespace(jobNamespace);
         } else {
-            this.jobNamespace = "default";
+            this.kubernetesInfo.setJobNamespace("default");
         }
         if (!isEmpty(pulsarDockerImageName)) {
-            this.pulsarDockerImageName = pulsarDockerImageName;
+            
this.kubernetesInfo.setPulsarDockerImageName(pulsarDockerImageName);
         } else {
-            this.pulsarDockerImageName = "apachepulsar/pulsar";
+            
this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar");
         }
         if (!isEmpty(pulsarRootDir)) {
-            this.pulsarRootDir = pulsarRootDir;
+            this.kubernetesInfo.setPulsarRootDir(pulsarRootDir);
         } else {
-            this.pulsarRootDir = "/pulsar";
+            this.kubernetesInfo.setPulsarRootDir("/pulsar");
         }
+        
this.kubernetesInfo.setPythonDependencyRepository(pythonDependencyRepository);
+        
this.kubernetesInfo.setPythonExtraDependencyRepository(pythonExtraDependencyRepository);
+        this.kubernetesInfo.setPulsarServiceUrl(pulsarServiceUri);
+        this.kubernetesInfo.setPulsarAdminUrl(pulsarAdminUri);
+        this.kubernetesInfo.setChangeConfigMap(changeConfigMap);
+        
this.kubernetesInfo.setChangeConfigMapNamespace(changeConfigMapNamespace);
         this.submittingInsidePod = submittingInsidePod;
         this.installUserCodeDependencies = installUserCodeDependencies;
-        this.pythonDependencyRepository = pythonDependencyRepository;
-        this.pythonExtraDependencyRepository = pythonExtraDependencyRepository;
         this.customLabels = customLabels;
-        this.pulsarServiceUri = pulsarServiceUri;
-        this.pulsarAdminUri = pulsarAdminUri;
         this.stateStorageServiceUri = stateStorageServiceUri;
         this.authConfig = authConfig;
-        this.javaInstanceJarFile = this.pulsarRootDir + 
"/instances/java-instance.jar";
-        this.pythonInstanceFile = this.pulsarRootDir + 
"/instances/python-instance/python_instance_main.py";
-        this.prometheusMetricsServerJarFile = this.pulsarRootDir + 
"/instances/PrometheusMetricsServer.jar";
-        this.expectedMetricsInterval = expectedMetricsInterval == null ? -1 : 
expectedMetricsInterval;
+        this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + 
"/instances/java-instance.jar";
+        this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + 
"/instances/python-instance/python_instance_main.py";
+        this.prometheusMetricsServerJarFile = 
this.kubernetesInfo.getPulsarRootDir() + 
"/instances/PrometheusMetricsServer.jar";
+        this.expectedMetricsCollectionInterval = 
expectedMetricsCollectionInterval == null ? -1 : 
expectedMetricsCollectionInterval;
     }
 
     @Override
@@ -131,24 +152,24 @@ public KubernetesRuntime createContainer(InstanceConfig 
instanceConfig, String c
         return new KubernetesRuntime(
             appsClient,
             coreClient,
-            jobNamespace,
+            this.kubernetesInfo.getJobNamespace(),
             customLabels,
             installUserCodeDependencies,
-            pythonDependencyRepository,
-            pythonExtraDependencyRepository,
-            pulsarDockerImageName,
-            pulsarRootDir,
+            this.kubernetesInfo.getPythonDependencyRepository(),
+            this.kubernetesInfo.getPythonExtraDependencyRepository(),
+            this.kubernetesInfo.getPulsarDockerImageName(),
+            this.kubernetesInfo.getPulsarRootDir(),
             instanceConfig,
             instanceFile,
             prometheusMetricsServerJarFile,
             logDirectory,
             codePkgUrl,
             originalCodeFileName,
-            pulsarServiceUri,
-            pulsarAdminUri,
+            this.kubernetesInfo.getPulsarServiceUrl(),
+            this.kubernetesInfo.getPulsarAdminUrl(),
             stateStorageServiceUri,
             authConfig,
-            expectedMetricsInterval);
+                expectedMetricsCollectionInterval);
     }
 
     @Override
@@ -163,7 +184,7 @@ public void doAdmissionChecks(Function.FunctionDetails 
functionDetails) {
     @VisibleForTesting
     void setupClient() throws Exception {
         if (appsClient == null) {
-            if (k8Uri == null) {
+            if (this.kubernetesInfo.getK8Uri() == null) {
                 log.info("k8Uri is null thus going by defaults");
                 ApiClient cli;
                 if (submittingInsidePod) {
@@ -177,11 +198,44 @@ void setupClient() throws Exception {
                 appsClient = new AppsV1Api();
                 coreClient = new CoreV1Api();
             } else {
-                log.info("Setting up k8Client using uri " + k8Uri);
-                final ApiClient apiClient = new ApiClient().setBasePath(k8Uri);
+                log.info("Setting up k8Client using uri " + 
this.kubernetesInfo.getK8Uri());
+                final ApiClient apiClient = new 
ApiClient().setBasePath(this.kubernetesInfo.getK8Uri());
                 appsClient = new AppsV1Api(apiClient);
                 coreClient = new CoreV1Api(apiClient);
             }
+
+            // Setup a timer to change stuff.
+            if (!isEmpty(this.kubernetesInfo.getChangeConfigMap())) {
+                changeConfigMapTimer = new Timer();
+                changeConfigMapTimer.scheduleAtFixedRate(new TimerTask() {
+                    @Override
+                    public void run() {
+                        fetchConfigMap();
+                    }
+                }, 300000, 300000);
+            }
+        }
+    }
+
+    void fetchConfigMap() {
+        try {
+            V1ConfigMap v1ConfigMap = 
coreClient.readNamespacedConfigMap(kubernetesInfo.getChangeConfigMap(), 
kubernetesInfo.getChangeConfigMapNamespace(), null, true, false);
+            Map<String, String> data = v1ConfigMap.getData();
+            if (data != null) {
+                overRideKubernetesConfig(data);
+            }
+        } catch (Exception e) {
+            log.error("Error while trying to fetch configmap {} at namespace 
{}", kubernetesInfo.getChangeConfigMap(), 
kubernetesInfo.getChangeConfigMapNamespace(), e);
+        }
+    }
+
+    void overRideKubernetesConfig(Map<String, String> data) throws Exception {
+        for (Field field : KubernetesInfo.class.getDeclaredFields()) {
+            field.setAccessible(true);
+            if (data.containsKey(field.getName()) && 
!data.get(field.getName()).equals(field.get(kubernetesInfo))) {
+                log.info("Kubernetes Config {} changed from {} to {}", 
field.getName(), field.get(kubernetesInfo), data.get(field.getName()));
+                field.set(kubernetesInfo, data.get(field.getName()));
+            }
         }
     }
 }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index e82c75febf..02886604c5 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,9 @@ public KubernetesRuntimeTest() throws Exception {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, 
pulsarRootDir,
-            false, true, "myrepo", "anotherrepo",  null, pulsarServiceUrl, 
pulsarAdminUrl, stateStorageServiceUrl, null, null));
+            false, true, "myrepo", "anotherrepo",
+                null, pulsarServiceUrl, pulsarAdminUrl, 
stateStorageServiceUrl, null,
+                null, null, null));
         doNothing().when(this.factory).setupClient();
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f7dd4bc07e..d8bf252905 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -140,7 +140,9 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, 
WorkerService workerSer
                     
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl())
 ? workerConfig.getPulsarWebServiceUrl() : 
workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
                     workerConfig.getStateStorageServiceUrl(),
                     authConfig,
-                    
workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval()
 == null ? -1 : 
workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval());
+                    
workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval()
 == null ? -1 : 
workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
+                    
workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
+                    
workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace());
         } else {
             throw new RuntimeException("Either Thread, Process or Kubernetes 
Container Factory need to be set");
         }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 2b3e816a79..587f3117bf 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -144,6 +144,11 @@
         private String pythonExtraDependencyRepository;
         private Map<String, String> customLabels;
         private Integer expectedMetricsCollectionInterval;
+        // Kubernetes Runtime will periodically checkback on
+        // this configMap if defined and if there are any changes
+        // to the kubernetes specific stuff, we apply those changes
+        private String changeConfigMap;
+        private String changeConfigMapNamespace;
     }
     private KubernetesContainerFactory kubernetesContainerFactory;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to