tillrohrmann commented on a change in pull request #15385: URL: https://github.com/apache/flink/pull/15385#discussion_r604283630
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java ########## @@ -70,17 +80,30 @@ private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class); + /** + * Cache for Flink kubernetes clients. Key is master URL and namespace. Value is + * Fabric8FlinkKubeClient and reference count. + */ + @GuardedBy("LOCK") + private static final Map<Tuple2<String, String>, Tuple2<Fabric8FlinkKubeClient, Integer>> + CACHE = new HashMap<>(); + /** Lock used to protect modification of CACHE. */ + private static final Object LOCK = new Object(); Review comment: Do we really need this complexity? What do we gain by it? We save instantiating 4 * x threads, right? ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java ########## @@ -70,17 +80,30 @@ private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class); + /** + * Cache for Flink kubernetes clients. Key is master URL and namespace. Value is + * Fabric8FlinkKubeClient and reference count. + */ + @GuardedBy("LOCK") + private static final Map<Tuple2<String, String>, Tuple2<Fabric8FlinkKubeClient, Integer>> + CACHE = new HashMap<>(); + /** Lock used to protect modification of CACHE. */ + private static final Object LOCK = new Object(); + private final NamespacedKubernetesClient internalClient; private final String clusterId; private final String namespace; private final int maxRetryAttempts; - private final Executor kubeClientExecutorService; + private final ExecutorService kubeClientExecutorService; + + /* F0 is master URL, and F1 is namespace. */ + private final Tuple2<String, String> cacheKey; Review comment: Let's not use `TupleX`. I think this class is very inexpressive. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java ########## @@ -342,7 +367,22 @@ public KubernetesWatch watchConfigMaps( @Override public void close() { + synchronized (LOCK) { + final Tuple2<Fabric8FlinkKubeClient, Integer> cachedClient = CACHE.get(cacheKey); + if (cachedClient != null) { + if (--cachedClient.f1 == 0) { + cachedClient.f0.internalClose(); + CACHE.remove(cacheKey); + } + } else { + internalClose(); + } Review comment: This case distinction also shows that there is something not right. Because we can either directly instantiate `Fabric8FlinkKubeClient` or use the `get` method which registers a client at the `CACHE` we need this special casing here. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java ########## @@ -354,6 +394,29 @@ public KubernetesPod loadPodFromTemplateFile(File file) { return new KubernetesPod(this.internalClient.pods().load(file).get()); } + public static FlinkKubeClient get(Configuration flinkConfig, Config kubeConfig) { + synchronized (LOCK) { + final Tuple2<String, String> cacheKey = + Tuple2.of(kubeConfig.getMasterUrl(), kubeConfig.getNamespace()); + if (CACHE.containsKey(cacheKey)) { + CACHE.get(cacheKey).f1++; + return CACHE.get(cacheKey).f0; + } + + final NamespacedKubernetesClient client = new DefaultKubernetesClient(kubeConfig); + final int poolSize = + flinkConfig.get( + KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE); + final ExecutorService executorService = + Executors.newFixedThreadPool( + poolSize, new ExecutorThreadFactory("flink-kubeclient-io@" + cacheKey)); + final Fabric8FlinkKubeClient flinkKubeClient = + new Fabric8FlinkKubeClient(flinkConfig, client, () -> executorService); + CACHE.put(cacheKey, Tuple2.of(flinkKubeClient, 1)); + return flinkKubeClient; + } + } Review comment: I would first try how it performs without the `CACHE`. I am sceptical whether this is really needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org