wangyang0918 commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r509238124



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract high availability services based on distributed system(e.g. 
Zookeeper, Kubernetes).
+ */
+public abstract class AbstractHaServices implements HighAvailabilityServices {
+
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+       /** The executor to run external IO operations on. */
+       protected final Executor executor;
+
+       /** The runtime configuration. */
+       protected final Configuration configuration;
+
+       /** Store for arbitrary blobs. */
+       private final BlobStoreService blobStoreService;
+
+       public AbstractHaServices(Executor executor, Configuration config, 
BlobStoreService blobStoreService) {
+
+               this.executor = checkNotNull(executor);
+               this.configuration = checkNotNull(config);
+               this.blobStoreService = blobStoreService;
+       }
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+               return 
createLeaderRetrievalService(getLeaderNameForResourceManager());
+       }
+
+       @Override
+       public LeaderRetrievalService getDispatcherLeaderRetriever() {
+               return 
createLeaderRetrievalService(getLeaderNameForDispatcher());
+       }
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
+               return 
createLeaderRetrievalService(getLeaderNameForJobManager(jobID));
+       }
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultJobManagerAddress) {
+               return getJobManagerLeaderRetriever(jobID);
+       }
+
+       @Override
+       public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+               return 
createLeaderRetrievalService(getLeaderNameForRestServer());
+       }
+
+       @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
+               return 
createLeaderElectionService(getLeaderNameForResourceManager());
+       }
+
+       @Override
+       public LeaderElectionService getDispatcherLeaderElectionService() {
+               return 
createLeaderElectionService(getLeaderNameForDispatcher());
+       }
+
+       @Override
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+               return 
createLeaderElectionService(getLeaderNameForJobManager(jobID));
+       }
+
+       @Override
+       public LeaderElectionService 
getClusterRestEndpointLeaderElectionService() {
+               return 
createLeaderElectionService(getLeaderNameForRestServer());
+       }
+
+       @Override
+       public BlobStore createBlobStore() {
+               return blobStoreService;
+       }
+
+       @Override
+       public void close() throws Exception {
+               Throwable exception = null;
+
+               try {
+                       blobStoreService.close();
+               } catch (Throwable t) {
+                       exception = t;
+               }
+
+               internalClose();
+
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(exception, "Could not 
properly close the " + getClass().getSimpleName());
+               }
+       }
+
+       @Override
+       public void closeAndCleanupAllData() throws Exception {
+               logger.info("Close and clean up all data for {}.", 
getClass().getSimpleName());
+
+               Throwable exception = null;
+
+               try {
+                       blobStoreService.closeAndCleanupAllData();
+               } catch (Throwable t) {
+                       exception = t;
+               }
+
+               try {
+                       internalCleanup();
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
+               internalClose();
+
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(
+                               exception, "Could not properly close and clean 
up all data of high availability service.");
+               }
+               logger.info("Finished cleaning up the high availability data.");
+       }
+
+       /**
+        * Create leader election service with specified leaderName.
+        * @param leaderName ConfigMap name in Kubernetes or child node path in 
Zookeeper.
+        *
+        * @return Return LeaderElectionService using Zookeeper or Kubernetes.
+        */
+       protected abstract LeaderElectionService 
createLeaderElectionService(String leaderName);
+
+       /**
+        * Create leader retrieval service with specified leaderName.
+        * @param leaderName ConfigMap name in Kubernetes or child node path in 
Zookeeper.
+        *
+        * @return Return LeaderRetrievalService using Zookeeper or Kubernetes.
+        */
+       protected abstract LeaderRetrievalService 
createLeaderRetrievalService(String leaderName);
+
+       /**
+        * Closes components which don't distinguish between close and 
closeAndCleanupAllData.
+        */
+       protected abstract void internalClose() throws Exception;

Review comment:
       I will remove the `Exception` here since we do not throw in 
`FlinkKubeClient#close` and `CuratorFramework#close`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to