This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4649b8e546 [INLONG-9825][Manager] Reduce the creation of
RestClusterClient (#9826)
4649b8e546 is described below
commit 4649b8e546c0ab60c958f380d2e8636cdec76800
Author: AloysZhang <[email protected]>
AuthorDate: Fri Mar 15 20:35:26 2024 +0800
[INLONG-9825][Manager] Reduce the creation of RestClusterClient (#9826)
---
.../inlong/manager/plugin/flink/FlinkClientService.java | 16 +++++++---------
.../inlong/manager/plugin/flink/FlinkClientService.java | 16 +++++++---------
2 files changed, 14 insertions(+), 18 deletions(-)
diff --git
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
index 4256db3842..b4abe7d3be 100644
---
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
+++
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -34,9 +34,11 @@ import java.util.concurrent.CompletableFuture;
public class FlinkClientService {
private final Configuration configuration;
+ private final RestClusterClient<StandaloneClusterId> flinkClient;
- public FlinkClientService(Configuration configuration) {
+ public FlinkClientService(Configuration configuration) throws Exception {
this.configuration = configuration;
+ this.flinkClient = getFlinkClient();
}
/**
@@ -56,9 +58,8 @@ public class FlinkClientService {
*/
public JobStatus getJobStatus(String jobId) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<JobStatus> jobStatus =
client.getJobStatus(jobID);
+ CompletableFuture<JobStatus> jobStatus =
flinkClient.getJobStatus(jobID);
return jobStatus.get();
} catch (Exception e) {
log.error("get job status by jobId={} failed: ", jobId, e);
@@ -71,9 +72,8 @@ public class FlinkClientService {
*/
public JobDetailsInfo getJobDetail(String jobId) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<JobDetailsInfo> jobDetails =
client.getJobDetails(jobID);
+ CompletableFuture<JobDetailsInfo> jobDetails =
flinkClient.getJobDetails(jobID);
return jobDetails.get();
} catch (Exception e) {
log.error("get job detail by jobId={} failed: ", jobId, e);
@@ -86,9 +86,8 @@ public class FlinkClientService {
*/
public String stopJob(String jobId, boolean isDrain, String
savepointDirectory) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<String> stopResult =
client.stopWithSavepoint(jobID, isDrain, savepointDirectory);
+ CompletableFuture<String> stopResult =
flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory);
return stopResult.get();
} catch (Exception e) {
log.error("stop job {} failed and savepoint directory is {} : ",
jobId, savepointDirectory, e);
@@ -101,9 +100,8 @@ public class FlinkClientService {
*/
public void cancelJob(String jobId) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- client.cancel(jobID);
+ flinkClient.cancel(jobID);
} catch (Exception e) {
log.error("cancel job {} failed: ", jobId, e);
throw new Exception("cancel job " + jobId + " failed: " +
e.getMessage());
diff --git
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
index 390db34188..58094e9bf7 100644
---
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
+++
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -35,9 +35,11 @@ import java.util.concurrent.CompletableFuture;
public class FlinkClientService {
private final Configuration configuration;
+ private final RestClusterClient<StandaloneClusterId> flinkClient;
- public FlinkClientService(Configuration configuration) {
+ public FlinkClientService(Configuration configuration) throws Exception {
this.configuration = configuration;
+ this.flinkClient = getFlinkClient();
}
/**
@@ -57,9 +59,8 @@ public class FlinkClientService {
*/
public JobStatus getJobStatus(String jobId) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<JobStatus> jobStatus =
client.getJobStatus(jobID);
+ CompletableFuture<JobStatus> jobStatus =
flinkClient.getJobStatus(jobID);
return jobStatus.get();
} catch (Exception e) {
log.error("get job status by jobId={} failed: ", jobId, e);
@@ -72,9 +73,8 @@ public class FlinkClientService {
*/
public JobDetailsInfo getJobDetail(String jobId) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<JobDetailsInfo> jobDetails =
client.getJobDetails(jobID);
+ CompletableFuture<JobDetailsInfo> jobDetails =
flinkClient.getJobDetails(jobID);
return jobDetails.get();
} catch (Exception e) {
log.error("get job detail by jobId={} failed: ", jobId, e);
@@ -87,9 +87,8 @@ public class FlinkClientService {
*/
public String stopJob(String jobId, boolean isDrain, String
savepointDirectory) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- CompletableFuture<String> stopResult =
client.stopWithSavepoint(jobID, isDrain, savepointDirectory,
+ CompletableFuture<String> stopResult =
flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory,
SavepointFormatType.CANONICAL);
return stopResult.get();
} catch (Exception e) {
@@ -103,9 +102,8 @@ public class FlinkClientService {
*/
public void cancelJob(String jobId) throws Exception {
try {
- RestClusterClient<StandaloneClusterId> client = getFlinkClient();
JobID jobID = JobID.fromHexString(jobId);
- client.cancel(jobID);
+ flinkClient.cancel(jobID);
} catch (Exception e) {
log.error("cancel job {} failed: ", jobId, e);
throw new Exception("cancel job " + jobId + " failed: " +
e.getMessage());