This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 083b343 [Improve]Improve the length of generated stream load label (#18) 083b343 is described below commit 083b343cbf880a90b60dffab3ff7397b34f7ae22 Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri May 17 16:36:56 2024 +0800 [Improve]Improve the length of generated stream load label (#18) --- .../doris/kafka/connector/utils/FileNameUtils.java | 2 +- .../kafka/connector/writer/LabelGenerator.java | 39 +++++++--------------- .../kafka/connector/writer/StreamLoadWriter.java | 11 +----- .../connector/writer/TestStreamLoadWriter.java | 8 ++--- 4 files changed, 18 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java index 052f03c..8c00f43 100644 --- a/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java +++ b/src/main/java/org/apache/doris/kafka/connector/utils/FileNameUtils.java @@ -89,7 +89,7 @@ public class FileNameUtils { } public static long labelToEndOffset(String label) { - return Long.parseLong(readFromFileName(label, 5)); + return Long.parseLong(readFromFileName(label, 3)); } /** diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java index 349db71..daca8b1 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/LabelGenerator.java @@ -19,55 +19,40 @@ package org.apache.doris.kafka.connector.writer; -import java.util.UUID; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; /** Generator label for stream load. */ public class LabelGenerator { - private final String labelPrefix; private String topic; private int partition; - private final boolean enable2PC; private String tableIdentifier; - private int subtaskId; + // The label of doris stream load cannot be repeated when loading. + // Under special circumstances (usually load failure) when doris-kafka-connector is started, + // stream load is performed at the same offset every time, which will cause label duplication. + // For this reason, we use labelRandomSuffix to generate a random suffix at startup. + private final AtomicLong labelRandomSuffix; - public LabelGenerator(String labelPrefix, boolean enable2PC) { - this.labelPrefix = labelPrefix; - this.enable2PC = enable2PC; - } - - public LabelGenerator( - String labelPrefix, - boolean enable2PC, - String topic, - int partition, - String tableIdentifier, - int subtaskId) { - this(labelPrefix, enable2PC); + public LabelGenerator(String topic, int partition, String tableIdentifier) { // The label of stream load can not contain `.` this.tableIdentifier = tableIdentifier.replaceAll("\\.", "_"); this.topic = topic.replaceAll("\\.", "_"); - this.subtaskId = subtaskId; this.partition = partition; + Random random = new Random(); + labelRandomSuffix = new AtomicLong(random.nextInt(1000)); } public String generateLabel(long lastOffset) { StringBuilder sb = new StringBuilder(); - sb.append(labelPrefix) - .append(LoadConstants.FILE_DELIM_DEFAULT) - .append(topic) + sb.append(topic) .append(LoadConstants.FILE_DELIM_DEFAULT) .append(partition) .append(LoadConstants.FILE_DELIM_DEFAULT) .append(tableIdentifier) .append(LoadConstants.FILE_DELIM_DEFAULT) - .append(subtaskId) - .append(LoadConstants.FILE_DELIM_DEFAULT) .append(lastOffset) .append(LoadConstants.FILE_DELIM_DEFAULT) - .append(System.currentTimeMillis()); - if (!enable2PC) { - sb.append(LoadConstants.FILE_DELIM_DEFAULT).append(UUID.randomUUID()); - } + .append(labelRandomSuffix.getAndIncrement()); return sb.toString(); } } diff --git a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java index a8fc00a..54f102a 100644 --- a/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java +++ b/src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java @@ -62,14 +62,7 @@ public class StreamLoadWriter extends DorisWriter { DorisConnectMonitor connectMonitor) { super(topic, partition, dorisOptions, connectionProvider, connectMonitor); this.taskId = dorisOptions.getTaskId(); - this.labelGenerator = - new LabelGenerator( - dorisOptions.getLabelPrefix(), - true, - topic, - partition, - tableIdentifier, - taskId); + this.labelGenerator = new LabelGenerator(topic, partition, tableIdentifier); BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, LOG); this.dorisCommitter = new DorisCommitter(dorisOptions, backendUtils); this.dorisStreamLoad = new DorisStreamLoad(backendUtils, dorisOptions, topic); @@ -105,8 +98,6 @@ public class StreamLoadWriter extends DorisWriter { String tmpTopic = topic.replaceAll("\\.", "_"); String querySQL = queryPatten - + dorisOptions.getLabelPrefix() - + LoadConstants.FILE_DELIM_DEFAULT + tmpTopic + LoadConstants.FILE_DELIM_DEFAULT + partition diff --git a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java index 0665a09..6f09bf1 100644 --- a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java +++ b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java @@ -67,16 +67,16 @@ public class TestStreamLoadWriter { private void fillLabel2Status() { label2Status.put( - "sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_321__KC_1706149860395", + "KC_avro-complex10__KC_2__KC_test_person_complex__KC_321__KC_1706149860395", "ABORT"); label2Status.put( - "sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_983__KC_1706149860395", + "KC_avro-complex10__KC_2__KC_test_person_complex__KC_983__KC_1706149860395", "ABORT"); label2Status.put( - "sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_781__KC_1706149860395", + "avro-complex10__KC_2__KC_test_person_complex__KC_781__KC_1706149860395", "VISIBLE"); label2Status.put( - "sink-connector-test__KC_avro-complex10__KC_2__KC_test_person_complex__KC_2__KC_832__KC_1706149860395", + "avro-complex10__KC_2__KC_test_person_complex__KC_832__KC_1706149860395", "VISIBLE"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org