This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9f06d7a improve check commit success (#210)
9f06d7a is described below
commit 9f06d7a79f0bebe4bbb6004032cc018c17eb55d8
Author: wudi <[email protected]>
AuthorDate: Tue Oct 17 10:43:08 2023 +0800
improve check commit success (#210)
---
.../org/apache/doris/flink/sink/committer/DorisCommitter.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 5bb1a40..0e19b0f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -17,11 +17,9 @@
package org.apache.doris.flink.sink.committer;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.connector.sink.Committer;
-
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
@@ -31,6 +29,7 @@ import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.ResponseUtil;
+import org.apache.flink.api.connector.sink.Committer;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -45,7 +44,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.doris.flink.sink.LoadStatus.FAIL;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
/**
* The committer to commit transaction.
@@ -109,7 +108,7 @@ public class DorisCommitter implements
Committer<DorisCommittable> {
String loadResult =
EntityUtils.toString(response.getEntity());
Map<String, String> res =
jsonMapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {
});
- if (res.get("status").equals(FAIL) &&
!ResponseUtil.isCommitted(res.get("msg"))) {
+ if (!res.get("status").equals(SUCCESS) &&
!ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisRuntimeException("Commit failed " +
loadResult);
} else {
LOG.info("load result {}", loadResult);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]