tillrohrmann commented on a change in pull request #15385:
URL: https://github.com/apache/flink/pull/15385#discussion_r602492105



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
##########
@@ -83,11 +89,19 @@ public FlinkKubeClient fromConfiguration(Configuration 
flinkConfig) {
         LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
         config.setNamespace(namespace);
 
+        final Tuple2<String, String> cacheKey = 
Tuple2.of(config.getMasterUrl(), namespace);
+        if (CACHE.containsKey(cacheKey)) {
+            return CACHE.get(cacheKey);
+        }
+
         final NamespacedKubernetesClient client = new 
DefaultKubernetesClient(config);
         final int poolSize =
                 
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
-        return new Fabric8FlinkKubeClient(
-                flinkConfig, client, () -> 
createThreadPoolForAsyncIO(poolSize));
+        final FlinkKubeClient flinkKubeClient =
+                new Fabric8FlinkKubeClient(
+                        flinkConfig, client, () -> 
createThreadPoolForAsyncIO(poolSize));
+        CACHE.put(cacheKey, flinkKubeClient);
+        return flinkKubeClient;

Review comment:
       Is this a good idea to introduce a cache for objects which are not owned 
by this factory? I think what can now happen is that a user of the 
`FlinkKubeClient` calls `FlinkKubeClient.close` and then leaves a client in the 
cache which has been closed.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
##########
@@ -87,12 +89,23 @@ public FlinkKubeClient fromConfiguration(Configuration 
flinkConfig, Executor ioE
         LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
         config.setNamespace(namespace);
 
-        final NamespacedKubernetesClient client = new 
DefaultKubernetesClient(config);
+        final Tuple2<String, String> cacheKey = 
Tuple2.of(config.getMasterUrl(), namespace);
+        if (CACHE.containsKey(cacheKey)) {
+            return CACHE.get(cacheKey);
+        }
 
-        return new Fabric8FlinkKubeClient(flinkConfig, client, () -> 
ioExecutor);
+        final NamespacedKubernetesClient client = new 
DefaultKubernetesClient(config);
+        final int poolSize =
+                
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
+        final FlinkKubeClient flinkKubeClient =
+                new Fabric8FlinkKubeClient(
+                        flinkConfig, client, () -> 
createThreadPoolForAsyncIO(poolSize));
+        CACHE.put(cacheKey, flinkKubeClient);
+        return flinkKubeClient;
     }
 
-    private static Executor createThreadPoolForAsyncIO() {
-        return Executors.newFixedThreadPool(2, new 
ExecutorThreadFactory("FlinkKubeClient-IO"));
+    private static ExecutorService createThreadPoolForAsyncIO(int poolSize) {
+        return Executors.newFixedThreadPool(
+                poolSize, new ExecutorThreadFactory("flink-kubeclient-io"));

Review comment:
       If we create multiple `FlinkKubeClients`, then we might wanna give their 
thread pools different names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to