This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6672e9407 [Improve] Add a jobId to the doris label to distinguish 
between tasks (#4839)
6672e9407 is described below

commit 6672e940773224d0b4c6bc40a789126cc8c69dbd
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue May 30 19:02:38 2023 +0800

    [Improve] Add a jobId to the doris label to distinguish between tasks 
(#4839)
    
    Co-authored-by: zhouyao <[email protected]>
---
 .../org/apache/seatunnel/connectors/doris/sink/DorisSink.java | 11 +++++++++--
 .../connectors/doris/sink/writer/DorisSinkWriter.java         |  6 ++++--
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 018eb44bd..2c6d6ae74 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.doris.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -54,6 +55,7 @@ public class DorisSink
 
     private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
+    private String jobId;
 
     @Override
     public String getPluginName() {
@@ -78,6 +80,11 @@ public class DorisSink
         }
     }
 
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobId = jobContext.getJobId();
+    }
+
     @Override
     public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
         this.seaTunnelRowType = seaTunnelRowType;
@@ -93,7 +100,7 @@ public class DorisSink
             SinkWriter.Context context) throws IOException {
         DorisSinkWriter dorisSinkWriter =
                 new DorisSinkWriter(
-                        context, Collections.emptyList(), seaTunnelRowType, 
pluginConfig);
+                        context, Collections.emptyList(), seaTunnelRowType, 
pluginConfig, jobId);
         dorisSinkWriter.initializeLoad(Collections.emptyList());
         return dorisSinkWriter;
     }
@@ -102,7 +109,7 @@ public class DorisSink
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> 
restoreWriter(
             SinkWriter.Context context, List<DorisSinkState> states) throws 
IOException {
         DorisSinkWriter dorisWriter =
-                new DorisSinkWriter(context, states, seaTunnelRowType, 
pluginConfig);
+                new DorisSinkWriter(context, states, seaTunnelRowType, 
pluginConfig, jobId);
         dorisWriter.initializeLoad(states);
         return dorisWriter;
     }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 744db83e6..ac0927f08 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -78,13 +78,15 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
             SinkWriter.Context context,
             List<DorisSinkState> state,
             SeaTunnelRowType seaTunnelRowType,
-            Config pluginConfig) {
+            Config pluginConfig,
+            String jobId) {
         this.dorisConfig = DorisConfig.loadConfig(pluginConfig);
         this.lastCheckpointId = state.size() != 0 ? 
state.get(0).getCheckpointId() : 0;
         log.info("restore checkpointId {}", lastCheckpointId);
         log.info("labelPrefix " + dorisConfig.getLabelPrefix());
         this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(), 
lastCheckpointId);
-        this.labelPrefix = dorisConfig.getLabelPrefix() + "_" + 
context.getIndexOfSubtask();
+        this.labelPrefix =
+                dorisConfig.getLabelPrefix() + "_" + jobId + "_" + 
context.getIndexOfSubtask();
         this.labelGenerator = new LabelGenerator(labelPrefix, 
dorisConfig.getEnable2PC());
         this.scheduledExecutorService =
                 new ScheduledThreadPoolExecutor(

Reply via email to