This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4649b8e546 [INLONG-9825][Manager] Reduce the creation of 
RestClusterClient  (#9826)
4649b8e546 is described below

commit 4649b8e546c0ab60c958f380d2e8636cdec76800
Author: AloysZhang <[email protected]>
AuthorDate: Fri Mar 15 20:35:26 2024 +0800

    [INLONG-9825][Manager] Reduce the creation of RestClusterClient  (#9826)
---
 .../inlong/manager/plugin/flink/FlinkClientService.java  | 16 +++++++---------
 .../inlong/manager/plugin/flink/FlinkClientService.java  | 16 +++++++---------
 2 files changed, 14 insertions(+), 18 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
index 4256db3842..b4abe7d3be 100644
--- 
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
+++ 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -34,9 +34,11 @@ import java.util.concurrent.CompletableFuture;
 public class FlinkClientService {
 
     private final Configuration configuration;
+    private final RestClusterClient<StandaloneClusterId> flinkClient;
 
-    public FlinkClientService(Configuration configuration) {
+    public FlinkClientService(Configuration configuration) throws Exception {
         this.configuration = configuration;
+        this.flinkClient = getFlinkClient();
     }
 
     /**
@@ -56,9 +58,8 @@ public class FlinkClientService {
      */
     public JobStatus getJobStatus(String jobId) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<JobStatus> jobStatus = 
client.getJobStatus(jobID);
+            CompletableFuture<JobStatus> jobStatus = 
flinkClient.getJobStatus(jobID);
             return jobStatus.get();
         } catch (Exception e) {
             log.error("get job status by jobId={} failed: ", jobId, e);
@@ -71,9 +72,8 @@ public class FlinkClientService {
      */
     public JobDetailsInfo getJobDetail(String jobId) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<JobDetailsInfo> jobDetails = 
client.getJobDetails(jobID);
+            CompletableFuture<JobDetailsInfo> jobDetails = 
flinkClient.getJobDetails(jobID);
             return jobDetails.get();
         } catch (Exception e) {
             log.error("get job detail by jobId={} failed: ", jobId, e);
@@ -86,9 +86,8 @@ public class FlinkClientService {
      */
     public String stopJob(String jobId, boolean isDrain, String 
savepointDirectory) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<String> stopResult = 
client.stopWithSavepoint(jobID, isDrain, savepointDirectory);
+            CompletableFuture<String> stopResult = 
flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory);
             return stopResult.get();
         } catch (Exception e) {
             log.error("stop job {} failed and savepoint directory is {} : ", 
jobId, savepointDirectory, e);
@@ -101,9 +100,8 @@ public class FlinkClientService {
      */
     public void cancelJob(String jobId) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            client.cancel(jobID);
+            flinkClient.cancel(jobID);
         } catch (Exception e) {
             log.error("cancel job {} failed: ", jobId, e);
             throw new Exception("cancel job " + jobId + " failed: " + 
e.getMessage());
diff --git 
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
index 390db34188..58094e9bf7 100644
--- 
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
+++ 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -35,9 +35,11 @@ import java.util.concurrent.CompletableFuture;
 public class FlinkClientService {
 
     private final Configuration configuration;
+    private final RestClusterClient<StandaloneClusterId> flinkClient;
 
-    public FlinkClientService(Configuration configuration) {
+    public FlinkClientService(Configuration configuration) throws Exception {
         this.configuration = configuration;
+        this.flinkClient = getFlinkClient();
     }
 
     /**
@@ -57,9 +59,8 @@ public class FlinkClientService {
      */
     public JobStatus getJobStatus(String jobId) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<JobStatus> jobStatus = 
client.getJobStatus(jobID);
+            CompletableFuture<JobStatus> jobStatus = 
flinkClient.getJobStatus(jobID);
             return jobStatus.get();
         } catch (Exception e) {
             log.error("get job status by jobId={} failed: ", jobId, e);
@@ -72,9 +73,8 @@ public class FlinkClientService {
      */
     public JobDetailsInfo getJobDetail(String jobId) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<JobDetailsInfo> jobDetails = 
client.getJobDetails(jobID);
+            CompletableFuture<JobDetailsInfo> jobDetails = 
flinkClient.getJobDetails(jobID);
             return jobDetails.get();
         } catch (Exception e) {
             log.error("get job detail by jobId={} failed: ", jobId, e);
@@ -87,9 +87,8 @@ public class FlinkClientService {
      */
     public String stopJob(String jobId, boolean isDrain, String 
savepointDirectory) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<String> stopResult = 
client.stopWithSavepoint(jobID, isDrain, savepointDirectory,
+            CompletableFuture<String> stopResult = 
flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory,
                     SavepointFormatType.CANONICAL);
             return stopResult.get();
         } catch (Exception e) {
@@ -103,9 +102,8 @@ public class FlinkClientService {
      */
     public void cancelJob(String jobId) throws Exception {
         try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
             JobID jobID = JobID.fromHexString(jobId);
-            client.cancel(jobID);
+            flinkClient.cancel(jobID);
         } catch (Exception e) {
             log.error("cancel job {} failed: ", jobId, e);
             throw new Exception("cancel job " + jobId + " failed: " + 
e.getMessage());

Reply via email to