This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b73ca39 [fix] fix the enable.batch-mode option not working (#372)
b73ca39 is described below
commit b73ca3925a7a781f620b8cac9cf3c0e958f32090
Author: Petrichor <[email protected]>
AuthorDate: Sat Apr 27 13:08:13 2024 +0800
[fix] fix the enable.batch-mode option not working (#372)
* fix the enable.batch-mode option not working
---
.../main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java | 3 +++
.../src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java | 2 +-
.../main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java | 2 +-
.../src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java | 2 +-
4 files changed, 6 insertions(+), 3 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index c5a9f66..779a296 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -293,6 +293,9 @@ public class DorisExecutionOptions implements Serializable {
public Builder setBatchMode(Boolean enableBatchMode) {
this.enableBatchMode = enableBatchMode;
+ if (enableBatchMode.equals(Boolean.TRUE)) {
+ this.writeMode = WriteMode.STREAM_LOAD_BATCH;
+ }
return this;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
index 1789fdf..89e9182 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
@@ -24,7 +24,7 @@ import java.util.Properties;
public class DorisStreamOptions implements Serializable {
private static final long serialVersionUID = 1L;
- private Properties prop;
+ private final Properties prop;
private DorisOptions options;
private DorisReadOptions readOptions;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
index 686b8ee..1b3a089 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java
@@ -82,7 +82,7 @@ public class DorisCopyWriter<IN>
.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
LOG.info("restore checkpointId {}", lastCheckpointId);
- LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
+ LOG.info("labelPrefix {}", executionOptions.getLabelPrefix());
this.labelPrefix =
executionOptions.getLabelPrefix()
+ "_"
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
index 46c3185..47add12 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -73,8 +73,8 @@ public class DorisSinkBatchExample {
.setBufferFlushMaxBytes(8 * 1024)
.setBufferFlushMaxRows(900)
.setBufferFlushIntervalMs(1000 * 10)
+ // .setBatchMode(true);
.setWriteMode(WriteMode.STREAM_LOAD_BATCH);
-
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]