tillrohrmann commented on a change in pull request #15385:
URL: https://github.com/apache/flink/pull/15385#discussion_r604283630



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -70,17 +80,30 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
 
+    /**
+     * Cache for Flink kubernetes clients. Key is master URL and namespace. 
Value is
+     * Fabric8FlinkKubeClient and reference count.
+     */
+    @GuardedBy("LOCK")
+    private static final Map<Tuple2<String, String>, 
Tuple2<Fabric8FlinkKubeClient, Integer>>
+            CACHE = new HashMap<>();
+    /** Lock used to protect modification of CACHE. */
+    private static final Object LOCK = new Object();

Review comment:
       Do we really need this complexity? What do we gain by it? We save 
instantiating 4 * x threads, right?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -70,17 +80,30 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
 
+    /**
+     * Cache for Flink kubernetes clients. Key is master URL and namespace. 
Value is
+     * Fabric8FlinkKubeClient and reference count.
+     */
+    @GuardedBy("LOCK")
+    private static final Map<Tuple2<String, String>, 
Tuple2<Fabric8FlinkKubeClient, Integer>>
+            CACHE = new HashMap<>();
+    /** Lock used to protect modification of CACHE. */
+    private static final Object LOCK = new Object();
+
     private final NamespacedKubernetesClient internalClient;
     private final String clusterId;
     private final String namespace;
     private final int maxRetryAttempts;
 
-    private final Executor kubeClientExecutorService;
+    private final ExecutorService kubeClientExecutorService;
+
+    /* F0 is master URL, and F1 is namespace. */
+    private final Tuple2<String, String> cacheKey;

Review comment:
       Let's not use `TupleX`. I think this class is very inexpressive.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -342,7 +367,22 @@ public KubernetesWatch watchConfigMaps(
 
     @Override
     public void close() {
+        synchronized (LOCK) {
+            final Tuple2<Fabric8FlinkKubeClient, Integer> cachedClient = 
CACHE.get(cacheKey);
+            if (cachedClient != null) {
+                if (--cachedClient.f1 == 0) {
+                    cachedClient.f0.internalClose();
+                    CACHE.remove(cacheKey);
+                }
+            } else {
+                internalClose();
+            }

Review comment:
       This case distinction also shows that there is something not right. 
Because we can either directly instantiate `Fabric8FlinkKubeClient` or use the 
`get` method which registers a client at the `CACHE` we need this special 
casing here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -354,6 +394,29 @@ public KubernetesPod loadPodFromTemplateFile(File file) {
         return new KubernetesPod(this.internalClient.pods().load(file).get());
     }
 
+    public static FlinkKubeClient get(Configuration flinkConfig, Config 
kubeConfig) {
+        synchronized (LOCK) {
+            final Tuple2<String, String> cacheKey =
+                    Tuple2.of(kubeConfig.getMasterUrl(), 
kubeConfig.getNamespace());
+            if (CACHE.containsKey(cacheKey)) {
+                CACHE.get(cacheKey).f1++;
+                return CACHE.get(cacheKey).f0;
+            }
+
+            final NamespacedKubernetesClient client = new 
DefaultKubernetesClient(kubeConfig);
+            final int poolSize =
+                    flinkConfig.get(
+                            
KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
+            final ExecutorService executorService =
+                    Executors.newFixedThreadPool(
+                            poolSize, new 
ExecutorThreadFactory("flink-kubeclient-io@" + cacheKey));
+            final Fabric8FlinkKubeClient flinkKubeClient =
+                    new Fabric8FlinkKubeClient(flinkConfig, client, () -> 
executorService);
+            CACHE.put(cacheKey, Tuple2.of(flinkKubeClient, 1));
+            return flinkKubeClient;
+        }
+    }

Review comment:
       I would first try how it performs without the `CACHE`. I am sceptical 
whether this is really needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to