This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 182c274 [Improvement] add option ignore commit error (#327)
182c274 is described below
commit 182c274710cbfed6686afc22de273adba2e846df
Author: wudi <[email protected]>
AuthorDate: Mon Mar 4 17:15:26 2024 +0800
[Improvement] add option ignore commit error (#327)
---
.../doris/flink/cfg/DorisExecutionOptions.java | 18 ++++++++--
.../org/apache/doris/flink/sink/DorisSink.java | 3 +-
.../doris/flink/sink/committer/DorisCommitter.java | 41 ++++++++++++++++------
.../doris/flink/table/DorisConfigOptions.java | 7 ++++
.../flink/table/DorisDynamicTableFactory.java | 3 ++
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 3 ++
.../flink/sink/committer/TestDorisCommitter.java | 5 ++-
7 files changed, 65 insertions(+), 15 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index a890d34..b1b49d1 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -63,6 +63,7 @@ public class DorisExecutionOptions implements Serializable {
private final boolean enableBatchMode;
private final boolean ignoreUpdateBefore;
private final WriteMode writeMode;
+ private final boolean ignoreCommitError;
public DorisExecutionOptions(
int checkInterval,
@@ -81,7 +82,8 @@ public class DorisExecutionOptions implements Serializable {
long bufferFlushIntervalMs,
boolean ignoreUpdateBefore,
boolean force2PC,
- WriteMode writeMode) {
+ WriteMode writeMode,
+ boolean ignoreCommitError) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
@@ -102,6 +104,7 @@ public class DorisExecutionOptions implements Serializable {
this.ignoreUpdateBefore = ignoreUpdateBefore;
this.writeMode = writeMode;
+ this.ignoreCommitError = ignoreCommitError;
}
public static Builder builder() {
@@ -205,6 +208,10 @@ public class DorisExecutionOptions implements Serializable
{
return writeMode;
}
+ public boolean ignoreCommitError() {
+ return ignoreCommitError;
+ }
+
/** Builder of {@link DorisExecutionOptions}. */
public static class Builder {
private int checkInterval = DEFAULT_CHECK_INTERVAL;
@@ -229,6 +236,7 @@ public class DorisExecutionOptions implements Serializable {
private boolean ignoreUpdateBefore = true;
private WriteMode writeMode = WriteMode.STREAM_LOAD;
+ private boolean ignoreCommitError = false;
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
@@ -320,6 +328,11 @@ public class DorisExecutionOptions implements Serializable
{
return this;
}
+ public Builder setIgnoreCommitError(boolean ignoreCommitError) {
+ this.ignoreCommitError = ignoreCommitError;
+ return this;
+ }
+
public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record
may not be written.
if (streamLoadProp != null
@@ -344,7 +357,8 @@ public class DorisExecutionOptions implements Serializable {
bufferFlushIntervalMs,
ignoreUpdateBefore,
force2PC,
- writeMode);
+ writeMode,
+ ignoreCommitError);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index e9ba2eb..2f00c9c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -89,8 +89,7 @@ public class DorisSink<IN>
public Committer createCommitter() throws IOException {
if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())
||
WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
- return new DorisCommitter(
- dorisOptions, dorisReadOptions,
dorisExecutionOptions.getMaxRetries());
+ return new DorisCommitter(dorisOptions, dorisReadOptions,
dorisExecutionOptions);
} else if
(WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCopyCommitter(dorisOptions,
dorisExecutionOptions.getMaxRetries());
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 3959c26..1fbab85 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -18,10 +18,12 @@
package org.apache.doris.flink.sink.committer;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
@@ -58,20 +60,25 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
private final BackendUtil backendUtil;
int maxRetry;
+ final boolean ignoreCommitError;
public DorisCommitter(
- DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int
maxRetry) {
- this(dorisOptions, dorisReadOptions, maxRetry, new
HttpUtil().getHttpClient());
+ DorisOptions dorisOptions,
+ DorisReadOptions dorisReadOptions,
+ DorisExecutionOptions executionOptions) {
+ this(dorisOptions, dorisReadOptions, executionOptions, new
HttpUtil().getHttpClient());
}
public DorisCommitter(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
- int maxRetry,
+ DorisExecutionOptions executionOptions,
CloseableHttpClient client) {
this.dorisOptions = dorisOptions;
this.dorisReadOptions = dorisReadOptions;
- this.maxRetry = maxRetry;
+ Preconditions.checkArgument(maxRetry >= 0);
+ this.maxRetry = executionOptions.getMaxRetries();
+ this.ignoreCommitError = executionOptions.ignoreCommitError();
this.httpClient = client;
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
@@ -99,8 +106,8 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
// hostPort
String hostPort = committable.getHostPort();
-
LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
+ Throwable ex = new Throwable();
int retry = 0;
while (retry <= maxRetry) {
// get latest-url
@@ -130,17 +137,31 @@ public class DorisCommitter implements
Committer<DorisCommittable>, Closeable {
String reasonPhrase = statusLine.getReasonPhrase();
LOG.error("commit failed with {}, reason {}", hostPort,
reasonPhrase);
if (retry == maxRetry) {
- throw new DorisRuntimeException("commit transaction error:
" + reasonPhrase);
+ ex = new DorisRuntimeException("commit transaction error:
" + reasonPhrase);
}
hostPort = backendUtil.getAvailableBackend();
} catch (Exception e) {
LOG.error("commit transaction failed, to retry, {}",
e.getMessage());
- if (retry == maxRetry) {
- throw new DorisRuntimeException("commit transaction error,
", e);
- }
+ ex = e;
hostPort = backendUtil.getAvailableBackend();
}
- retry++;
+
+ if (retry++ >= maxRetry) {
+ if (ignoreCommitError) {
+ // Generally used when txn(stored in checkpoint) expires
and unexpected
+ // errors occur in commit.
+
+ // It should be noted that you must manually ensure that
the txn has been
+ // successfully submitted to doris, otherwise there may be
a risk of data
+ // loss.
+ LOG.error(
+ "Unable to commit transaction {} and data has been
potentially lost ",
+ committable,
+ ex);
+ } else {
+ throw new DorisRuntimeException("commit transaction error,
", ex);
+ }
+ }
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 2c7c753..c698871 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -235,6 +235,13 @@ public class DorisConfigOptions {
.defaultValue(WriteMode.STREAM_LOAD.name())
.withDescription("Write mode, supports stream_load,
stream_load_batch");
+ public static final ConfigOption<Boolean> SINK_IGNORE_COMMIT_ERROR =
+ ConfigOptions.key("sink.ignore.commit-error")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to ignore commit errors. Usually used
when the checkpoint cannot be restored to skip the commit of txn. The default
is false.");
+
public static final ConfigOption<Integer> SINK_PARALLELISM =
FactoryUtil.SINK_PARALLELISM;
public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index c7d13f6..bf5cd8c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -73,6 +73,7 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE;
import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
@@ -156,6 +157,7 @@ public final class DorisDynamicTableFactory
options.add(SOURCE_USE_OLD_API);
options.add(SINK_WRITE_MODE);
+ options.add(SINK_IGNORE_COMMIT_ERROR);
return options;
}
@@ -226,6 +228,7 @@ public final class DorisDynamicTableFactory
builder.setStreamLoadProp(streamLoadProp);
builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE));
+
builder.setIgnoreCommitError(readableConfig.get(SINK_IGNORE_COMMIT_ERROR));
if (!readableConfig.get(SINK_ENABLE_2PC)) {
builder.disable2PC();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index cd5ef1a..d713355 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -290,6 +290,9 @@ public abstract class DatabaseSync {
sinkConfig
.getOptional(DorisConfigOptions.SINK_WRITE_MODE)
.ifPresent(v ->
executionBuilder.setWriteMode(WriteMode.of(v)));
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
+ .ifPresent(executionBuilder::setIgnoreCommitError);
DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index f5c82bb..be5bb0d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.sink.committer;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
@@ -59,6 +60,7 @@ public class TestDorisCommitter {
public void setUp() throws Exception {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions();
+ DorisExecutionOptions executionOptions =
OptionUtils.buildExecutionOptional();
dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
entityMock = new HttpEntityMock();
@@ -78,7 +80,8 @@ public class TestDorisCommitter {
BackendV2.BackendRowV2.of("127.0.0.1", 8040,
true)));
backendUtilMockedStatic.when(() ->
BackendUtil.tryHttpConnection(any())).thenReturn(true);
- dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3,
httpClient);
+ dorisCommitter =
+ new DorisCommitter(dorisOptions, readOptions,
executionOptions, httpClient);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]