This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 
by this push:
     new 73647755 [Improve] add sink.lable-prefix for 1.13 and add ignore 
properties (#647)
73647755 is described below

commit 736477558708fb5f8338f6dd043733d544e92912
Author: wudi <[email protected]>
AuthorDate: Tue May 19 14:30:08 2026 +0800

    [Improve] add sink.lable-prefix for 1.13 and add ignore properties (#647)
---
 .../apache/doris/flink/cfg/DorisExecutionOptions.java    | 16 ++++++++++++++--
 .../org/apache/doris/flink/rest/models/BackendV2.java    |  2 +-
 .../java/org/apache/doris/flink/rest/models/Field.java   |  3 +++
 .../org/apache/doris/flink/rest/models/QueryPlan.java    |  3 +++
 .../java/org/apache/doris/flink/rest/models/Schema.java  |  3 +++
 .../java/org/apache/doris/flink/rest/models/Tablet.java  |  5 ++++-
 .../doris/flink/table/DorisDynamicOutputFormat.java      |  3 ++-
 .../doris/flink/table/DorisDynamicTableFactory.java      |  7 +++++++
 .../org/apache/doris/flink/table/DorisStreamLoad.java    | 11 +++++------
 9 files changed, 42 insertions(+), 11 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 6d3a4eaa..6e192a40 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -45,8 +45,9 @@ public class DorisExecutionOptions implements Serializable {
 
     private final Boolean enableDelete;
 
+    private final String labelPrefix;
 
-    public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long 
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete, Long 
maxBatchBytes) {
+    public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long 
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete, Long 
maxBatchBytes, String labelPrefix) {
         Preconditions.checkArgument(maxRetries >= 0);
         Preconditions.checkArgument(maxBatchBytes >= 0);
         this.batchSize = batchSize;
@@ -55,6 +56,7 @@ public class DorisExecutionOptions implements Serializable {
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
         this.maxBatchBytes = maxBatchBytes;
+        this.labelPrefix = labelPrefix;
     }
 
     public static Builder builder() {
@@ -92,6 +94,10 @@ public class DorisExecutionOptions implements Serializable {
         return maxBatchBytes;
     }
 
+    public String getLabelPrefix() {
+        return labelPrefix;
+    }
+
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
@@ -102,6 +108,7 @@ public class DorisExecutionOptions implements Serializable {
         private Properties streamLoadProp = new Properties();
         private Boolean enableDelete = false;
         private Long maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
+        private String labelPrefix;
 
         public Builder setBatchSize(Integer batchSize) {
             this.batchSize = batchSize;
@@ -133,8 +140,13 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        public Builder setLabelPrefix(String labelPrefix) {
+            this.labelPrefix = labelPrefix;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(batchSize, maxRetries, 
batchIntervalMs, streamLoadProp, enableDelete, maxBatchBytes);
+            return new DorisExecutionOptions(batchSize, maxRetries, 
batchIntervalMs, streamLoadProp, enableDelete, maxBatchBytes, labelPrefix);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
index a6b5cf3f..d7f0116d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -19,7 +19,6 @@ package org.apache.doris.flink.rest.models;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import java.util.List;
 
 /**
@@ -39,6 +38,7 @@ public class BackendV2 {
         this.backends = backends;
     }
 
+    @JsonIgnoreProperties(ignoreUnknown = true)
     public static class BackendRowV2 {
         @JsonProperty("ip")
         public String ip;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
index 04341bf5..b45bf118 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
@@ -17,8 +17,11 @@
 
 package org.apache.doris.flink.rest.models;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
 import java.util.Objects;
 
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Field {
     private String name;
     private String type;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
index e65175ca..5b2ace1e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/QueryPlan.java
@@ -17,9 +17,12 @@
 
 package org.apache.doris.flink.rest.models;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
 import java.util.Map;
 import java.util.Objects;
 
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class QueryPlan {
     private int status;
     private String opaqued_query_plan;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
index 264e7368..cfd015c1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Schema.java
@@ -17,10 +17,13 @@
 
 package org.apache.doris.flink.rest.models;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Schema {
     private int status = 0;
     private String keysType;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
index 70b0f139..78ed72f6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Tablet.java
@@ -17,10 +17,13 @@
 
 package org.apache.doris.flink.rest.models;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
 import java.util.List;
 import java.util.Objects;
 
-public class Tablet {
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class    Tablet {
     private List<String> routings;
     private int version;
     private long versionHash;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 993dc651..d0bf8311 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -207,7 +207,8 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                 options.getUsername(),
                 options.getPassword(),
                 executionOptions.getStreamLoadProp(),
-                readOptions);
+                readOptions,
+                executionOptions.getLabelPrefix());
 
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
             this.scheduler = Executors.newScheduledThreadPool(1, new 
ExecutorThreadFactory("doris-streamload-output" +
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 7033dbd9..d11c2217 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -147,6 +147,11 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
             .defaultValue(DorisExecutionOptions.DEFAULT_MAX_BATCH_BYTES)
             .withDescription("the flush max bytes (includes all append, upsert 
and delete records), over this number" +
                     " in batch, will flush data. The default value is 10MB.");
+    private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
+            .key("sink.label-prefix")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("the label prefix for stream load, the default 
value is flink_connector");
 
     @Override
     public String factoryIdentifier() {
@@ -186,6 +191,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_BUFFER_FLUSH_INTERVAL);
         options.add(SINK_ENABLE_DELETE);
         options.add(SINK_BUFFER_FLUSH_MAX_BYTES);
+        options.add(SINK_LABEL_PREFIX);
         return options;
     }
 
@@ -243,6 +249,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         builder.setStreamLoadProp(streamLoadProp);
         builder.setEnableDelete(readableConfig.get(SINK_ENABLE_DELETE));
         
builder.setMaxBatchBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
+        
readableConfig.getOptional(SINK_LABEL_PREFIX).ifPresent(builder::setLabelPrefix);
         return builder.build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index 72cf8ffc..b22d2fba 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -40,10 +40,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -68,9 +66,11 @@ public class DorisStreamLoad implements Serializable {
     private String tbl;
     private String authEncoding;
     private Properties streamLoadProp;
+    private static final String LABEL_PREFIX_DEFAULT = "flink_connector";
+    private String labelPrefix;
     private final HttpClientBuilder httpClientBuilder;
 
-    public DorisStreamLoad(String hostPort, String db, String tbl, String 
user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions) {
+    public DorisStreamLoad(String hostPort, String db, String tbl, String 
user, String passwd, Properties streamLoadProp, DorisReadOptions readOptions, 
String labelPrefix) {
         this.hostPort = hostPort;
         this.db = db;
         this.tbl = tbl;
@@ -79,6 +79,7 @@ public class DorisStreamLoad implements Serializable {
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = basicAuthHeader(user, passwd);
         this.streamLoadProp = streamLoadProp;
+        this.labelPrefix = StringUtils.isBlank(labelPrefix) ? 
LABEL_PREFIX_DEFAULT : labelPrefix;
         int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null 
? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : 
readOptions.getRequestConnectTimeoutMs();
         int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? 
ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : 
readOptions.getRequestReadTimeoutMs();
         this.httpClientBuilder = HttpClients
@@ -127,9 +128,7 @@ public class DorisStreamLoad implements Serializable {
     private LoadResponse loadBatch(String value) {
         String label = streamLoadProp.getProperty("label");
         if (StringUtils.isBlank(label)) {
-            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss");
-            String formatDate = sdf.format(new Date());
-            label = String.format("flink_connector_%s_%s", formatDate,
+            label = String.format("%s_%s", labelPrefix,
                     UUID.randomUUID().toString().replaceAll("-", ""));
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to