This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 083b343  [Improve]Improve the length of generated stream load label 
(#18)
083b343 is described below

commit 083b343cbf880a90b60dffab3ff7397b34f7ae22
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Fri May 17 16:36:56 2024 +0800

    [Improve]Improve the length of generated stream load label (#18)
---
 .../doris/kafka/connector/utils/FileNameUtils.java |  2 +-
 .../kafka/connector/writer/LabelGenerator.java     | 39 +++++++---------------
 .../kafka/connector/writer/StreamLoadWriter.java   | 11 +-----
 .../connector/writer/TestStreamLoadWriter.java     |  8 ++---
 4 files changed, 18 insertions(+), 42 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java 
b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
index 052f03c..8c00f43 100644
--- a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
+++ b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java
@@ -89,7 +89,7 @@ public class FileNameUtils {
     }
 
     public static long labelToEndOffset(String label) {
-        return Long.parseLong(readFromFileName(label, 5));
+        return Long.parseLong(readFromFileName(label, 3));
     }
 
     /**
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
index 349db71..daca8b1 100644
--- a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
+++ b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java
@@ -19,55 +19,40 @@
 
 package org.apache.doris.kafka.connector.writer;
 
-import java.util.UUID;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** Generator label for stream load. */
 public class LabelGenerator {
-    private final String labelPrefix;
     private String topic;
     private int partition;
-    private final boolean enable2PC;
     private String tableIdentifier;
-    private int subtaskId;
+    // The label of doris stream load cannot be repeated when loading.
+    // Under special circumstances (usually load failure) when 
doris-kafka-connector is started,
+    // stream load is performed at the same offset every time, which will 
cause label duplication.
+    // For this reason, we use labelRandomSuffix to generate a random suffix 
at startup.
+    private final AtomicLong labelRandomSuffix;
 
-    public LabelGenerator(String labelPrefix, boolean enable2PC) {
-        this.labelPrefix = labelPrefix;
-        this.enable2PC = enable2PC;
-    }
-
-    public LabelGenerator(
-            String labelPrefix,
-            boolean enable2PC,
-            String topic,
-            int partition,
-            String tableIdentifier,
-            int subtaskId) {
-        this(labelPrefix, enable2PC);
+    public LabelGenerator(String topic, int partition, String tableIdentifier) 
{
         // The label of stream load can not contain `.`
         this.tableIdentifier = tableIdentifier.replaceAll("\\.", "_");
         this.topic = topic.replaceAll("\\.", "_");
-        this.subtaskId = subtaskId;
         this.partition = partition;
+        Random random = new Random();
+        labelRandomSuffix = new AtomicLong(random.nextInt(1000));
     }
 
     public String generateLabel(long lastOffset) {
         StringBuilder sb = new StringBuilder();
-        sb.append(labelPrefix)
-                .append(LoadConstants.FILE_DELIM_DEFAULT)
-                .append(topic)
+        sb.append(topic)
                 .append(LoadConstants.FILE_DELIM_DEFAULT)
                 .append(partition)
                 .append(LoadConstants.FILE_DELIM_DEFAULT)
                 .append(tableIdentifier)
                 .append(LoadConstants.FILE_DELIM_DEFAULT)
-                .append(subtaskId)
-                .append(LoadConstants.FILE_DELIM_DEFAULT)
                 .append(lastOffset)
                 .append(LoadConstants.FILE_DELIM_DEFAULT)
-                .append(System.currentTimeMillis());
-        if (!enable2PC) {
-            
sb.append(LoadConstants.FILE_DELIM_DEFAULT).append(UUID.randomUUID());
-        }
+                .append(labelRandomSuffix.getAndIncrement());
         return sb.toString();
     }
 }
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
index a8fc00a..54f102a 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java
@@ -62,14 +62,7 @@ public class StreamLoadWriter extends DorisWriter {
             DorisConnectMonitor connectMonitor) {
         super(topic, partition, dorisOptions, connectionProvider, 
connectMonitor);
         this.taskId = dorisOptions.getTaskId();
-        this.labelGenerator =
-                new LabelGenerator(
-                        dorisOptions.getLabelPrefix(),
-                        true,
-                        topic,
-                        partition,
-                        tableIdentifier,
-                        taskId);
+        this.labelGenerator = new LabelGenerator(topic, partition, 
tableIdentifier);
         BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, 
LOG);
         this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils);
         this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions, 
topic);
@@ -105,8 +98,6 @@ public class StreamLoadWriter extends DorisWriter {
         String tmpTopic = topic.replaceAll("\\.", "_");
         String querySQL =
                 queryPatten
-                        + dorisOptions.getLabelPrefix()
-                        + LoadConstants.FILE_DELIM_DEFAULT
                         + tmpTopic
                         + LoadConstants.FILE_DELIM_DEFAULT
                         + partition
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index 0665a09..6f09bf1 100644
--- 
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++ 
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -67,16 +67,16 @@ public class TestStreamLoadWriter {
 
     private void fillLabel2Status() {
         label2Status.put(
-                
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_321__KC_1706149860395",
+                
"KC_avro-complex10__KC_2__KC_test_person_complex__KC_321__KC_1706149860395",
                 "ABORT");
         label2Status.put(
-                
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_983__KC_1706149860395",
+                
"KC_avro-complex10__KC_2__KC_test_person_complex__KC_983__KC_1706149860395",
                 "ABORT");
         label2Status.put(
-                
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_781__KC_1706149860395",
+                
"avro-complex10__KC_2__KC_test_person_complex__KC_781__KC_1706149860395",
                 "VISIBLE");
         label2Status.put(
-                
"sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_832__KC_1706149860395",
+                
"avro-complex10__KC_2__KC_test_person_complex__KC_832__KC_1706149860395",
                 "VISIBLE");
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to