This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5150d77 [fix](connector) Fixed the issue where the loading task got
stuck when stream load ended unexpectedly (#305)
5150d77 is described below
commit 5150d7780dfcd0bfd5b113325267fa3bffcdd0be
Author: gnehil <[email protected]>
AuthorDate: Thu Apr 10 15:36:19 2025 +0800
[fix](connector) Fixed the issue where the loading task got stuck when
stream load ended unexpectedly (#305)
---
.../client/write/AbstractStreamLoadProcessor.java | 181 +++++++++++++--------
.../apache/doris/spark/config/DorisOptions.java | 4 +-
.../spark/sql/DorisWriterFailoverITCase.scala | 41 ++++-
.../apache/doris/spark/write/DorisDataWriter.scala | 4 +-
4 files changed, 151 insertions(+), 79 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 169f5a8..eec3f73 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -17,12 +17,6 @@
package org.apache.doris.spark.client.write;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import org.apache.doris.spark.client.DorisBackendHttpClient;
import org.apache.doris.spark.client.DorisFrontendClient;
import org.apache.doris.spark.client.entity.Backend;
@@ -36,6 +30,10 @@ import org.apache.doris.spark.util.EscapeHandler;
import org.apache.doris.spark.util.HttpUtils;
import org.apache.doris.spark.util.URLs;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
@@ -51,6 +49,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
@@ -66,56 +66,39 @@ import java.util.stream.Collectors;
public abstract class AbstractStreamLoadProcessor<R> extends DorisWriter<R>
implements DorisCommitter {
- protected final Logger logger =
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
-
- protected static final JsonMapper MAPPER =
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false).build();
-
+ protected static final JsonMapper MAPPER =
+
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false).build();
private static final String PARTIAL_COLUMNS = "partial_columns";
private static final String GROUP_COMMIT = "group_commit";
- private static final Set<String> VALID_GROUP_MODE = new
HashSet<>(Arrays.asList("sync_mode", "async_mode", "off_mode"));
-
+ private static final Set<String> VALID_GROUP_MODE =
+ new HashSet<>(Arrays.asList("sync_mode", "async_mode",
"off_mode"));
+ private static final int arrowBufferSize = 1000;
+ protected final Logger logger =
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
protected final DorisConfig config;
-
private final DorisFrontendClient frontend;
private final DorisBackendHttpClient backendHttpClient;
-
private final String database;
private final String table;
-
private final boolean autoRedirect;
-
private final boolean isHttpsEnabled;
-
private final boolean isTwoPhaseCommitEnabled;
-
private final Map<String, String> properties;
-
private final DataFormat format;
-
+ private final boolean isGzipCompressionEnabled;
+ private final boolean isPassThrough;
+ private final List<R> recordBuffer = new LinkedList<>();
+ private final int pipeSize;
protected String columnSeparator;
-
private byte[] lineDelimiter;
-
- private final boolean isGzipCompressionEnabled;
-
private String groupCommit;
-
- private final boolean isPassThrough;
-
private PipedOutputStream output;
-
private boolean createNewBatch = true;
-
private boolean isFirstRecordOfBatch = true;
-
- private final List<R> recordBuffer = new LinkedList<>();
-
- private static final int arrowBufferSize = 1000;
-
private transient ExecutorService executor;
- private Future<CloseableHttpResponse> requestFuture = null;
+ private Future<StreamLoadResponse> requestFuture = null;
private volatile String currentLabel;
+ private Exception unexpectedException = null;
public AbstractStreamLoadProcessor(DorisConfig config) throws Exception {
super(config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE));
@@ -132,22 +115,31 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
// init stream load props
this.isTwoPhaseCommitEnabled =
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC);
this.format = DataFormat.valueOf(properties.getOrDefault("format",
"csv").toUpperCase());
- this.isGzipCompressionEnabled =
properties.containsKey("compress_type") &&
"gzip".equals(properties.get("compress_type"));
+ this.isGzipCompressionEnabled =
+ properties.containsKey("compress_type") &&
"gzip".equals(properties.get("compress_type"));
if (properties.containsKey(GROUP_COMMIT)) {
String message = "";
- if (isTwoPhaseCommitEnabled) message = "group commit does not
support two-phase commit";
- if (properties.containsKey(PARTIAL_COLUMNS) &&
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS)))
+ if (isTwoPhaseCommitEnabled) {
+ message = "group commit does not support two-phase commit";
+ }
+ if (properties.containsKey(PARTIAL_COLUMNS) &&
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) {
message = "group commit does not support partial column
updates";
- if
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase()))
+ }
+ if
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) {
message = "Unsupported group commit mode: " +
properties.get(GROUP_COMMIT);
- if (!message.isEmpty()) throw new
IllegalArgumentException(message);
+ }
+ if (!message.isEmpty()) {
+ throw new IllegalArgumentException(message);
+ }
groupCommit = properties.get(GROUP_COMMIT).toLowerCase();
}
this.isPassThrough =
config.getValue(DorisOptions.DORIS_SINK_STREAMING_PASSTHROUGH);
+ this.pipeSize =
config.getValue(DorisOptions.DORIS_SINK_NET_BUFFER_SIZE);
}
public void load(R row) throws Exception {
if (createNewBatch) {
+ createNewBatch = false;
if (autoRedirect) {
requestFuture = frontend.requestFrontends((frontEnd,
httpClient) ->
buildReqAndExec(frontEnd.getHost(),
frontEnd.getHttpPort(), httpClient));
@@ -155,14 +147,13 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
requestFuture = backendHttpClient.executeReq((backend,
httpClient) ->
buildReqAndExec(backend.getHost(),
backend.getHttpPort(), httpClient));
}
- createNewBatch = false;
}
if (isFirstRecordOfBatch) {
isFirstRecordOfBatch = false;
- } else if (lineDelimiter != null){
- output.write(lineDelimiter);
+ } else if (lineDelimiter != null) {
+ writeTo(lineDelimiter);
}
- output.write(toFormat(row, format));
+ writeTo(toFormat(row, format));
currentBatchCount++;
}
@@ -175,23 +166,24 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
if (!recordBuffer.isEmpty() && DataFormat.ARROW.equals(format)) {
List<R> rs = new LinkedList<>(recordBuffer);
recordBuffer.clear();
- output.write(toArrowFormat(rs));
+ writeTo(toArrowFormat(rs));
}
output.close();
logger.info("stream load stopped with {}", currentLabel != null ?
currentLabel : "group commit");
- CloseableHttpResponse res = requestFuture.get();
- if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- throw new StreamLoadException("stream load execute failed,
status: " + res.getStatusLine().getStatusCode()
- + ", msg: " + res.getStatusLine().getReasonPhrase());
- }
- String resEntity = EntityUtils.toString(new
BufferedHttpEntity(res.getEntity()));
- logger.info("stream load response: {}", resEntity);
- StreamLoadResponse response = MAPPER.readValue(resEntity,
StreamLoadResponse.class);
- if (response != null && response.isSuccess()) {
- return isTwoPhaseCommitEnabled ?
String.valueOf(response.getTxnId()) : null;
- } else {
- throw new StreamLoadException("stream load execute failed,
response: " + resEntity);
+
+ StreamLoadResponse response;
+ try {
+ response = requestFuture.get();
+ if (response == null) {
+ throw new StreamLoadException("response is null");
+ }
+ } catch (Exception e) {
+ if (unexpectedException != null) {
+ throw unexpectedException;
+ }
+ throw new StreamLoadException("stream load stop failed", e);
}
+ return isTwoPhaseCommitEnabled ?
String.valueOf(response.getTxnId()) : null;
}
return null;
}
@@ -211,8 +203,9 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
.getReasonPhrase());
} else {
String resEntity = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity()));
- if(!checkTransResponse(resEntity)) {
- throw new RuntimeException("commit transaction failed,
transaction: " + msg + ", resp: " + resEntity);
+ if (!checkTransResponse(resEntity)) {
+ throw new RuntimeException(
+ "commit transaction failed, transaction: " + msg +
", resp: " + resEntity);
} else {
this.logger.info("commit: {} response: {}", msg,
resEntity);
}
@@ -256,8 +249,9 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
.getReasonPhrase());
} else {
String resEntity = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity()));
- if(!checkTransResponse(resEntity)) {
- throw new RuntimeException("abort transaction failed,
transaction: " + msg + ", resp: " + resEntity);
+ if (!checkTransResponse(resEntity)) {
+ throw new RuntimeException(
+ "abort transaction failed, transaction: " + msg +
", resp: " + resEntity);
} else {
this.logger.info("abort: {} response: {}", msg, resEntity);
}
@@ -271,11 +265,11 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
ObjectMapper objectMapper = new ObjectMapper();
try {
String status =
objectMapper.readTree(resEntity).get("status").asText();
- if ("Success".equalsIgnoreCase(status)) {
- return true;
- }
+ if ("Success".equalsIgnoreCase(status)) {
+ return true;
+ }
} catch (JsonProcessingException e) {
- logger.warn("invalid json response: " + resEntity, e);
+ logger.warn("invalid json response: " + resEntity, e);
}
return false;
}
@@ -341,7 +335,9 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
if (config.contains(DorisOptions.DORIS_MAX_FILTER_RATIO)) {
httpPut.setHeader("max_filter_ratio",
config.getValue(DorisOptions.DORIS_MAX_FILTER_RATIO));
}
- if (isTwoPhaseCommitEnabled) httpPut.setHeader("two_phase_commit",
"true");
+ if (isTwoPhaseCommitEnabled) {
+ httpPut.setHeader("two_phase_commit", "true");
+ }
switch (format) {
case CSV:
@@ -352,7 +348,7 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
break;
case JSON:
lineDelimiter = properties.getOrDefault("line_delimiter",
"\n").getBytes(
- StandardCharsets.UTF_8);
+ StandardCharsets.UTF_8);
properties.put("read_json_by_line", "true");
break;
case ARROW:
@@ -384,14 +380,14 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
protected abstract String generateStreamLoadLabel() throws
OptionRequiredException;
- private Future<CloseableHttpResponse> buildReqAndExec(String host, Integer
port, CloseableHttpClient client) {
+ private Future<StreamLoadResponse> buildReqAndExec(String host, Integer
port, CloseableHttpClient client) {
HttpPut httpPut = new HttpPut(URLs.streamLoad(host, port, database,
table, isHttpsEnabled));
try {
handleStreamLoadProperties(httpPut);
} catch (OptionRequiredException e) {
throw new RuntimeException("stream load handle properties failed",
e);
}
- PipedInputStream pipedInputStream = new PipedInputStream(4096);
+ PipedInputStream pipedInputStream = new PipedInputStream(pipeSize);
try {
output = new PipedOutputStream(pipedInputStream);
} catch (IOException e) {
@@ -402,15 +398,44 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
entity = new GzipCompressingEntity(entity);
}
httpPut.setEntity(entity);
-
- logger.info("table {}.{} stream load started for {} on host {}:{}",
database, table, currentLabel != null ? currentLabel : "group commit", host,
port);
- return getExecutors().submit(() -> client.execute(httpPut));
+ Thread currentThread = Thread.currentThread();
+
+ logger.info("table {}.{} stream load started for {} on host {}:{}",
database, table,
+ currentLabel != null ? currentLabel : "group commit", host,
port);
+ return getExecutors().submit(() -> {
+ StreamLoadResponse streamLoadResponse = null;
+ try (CloseableHttpResponse response = client.execute(httpPut)) {
+ // stream load http request finished unexpectedly
+ if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
+ throw new StreamLoadException(
+ "stream load failed, status: " +
response.getStatusLine().getStatusCode()
+ + ", reason: " +
response.getStatusLine().getReasonPhrase());
+ }
+ String entityStr = EntityUtils.toString(response.getEntity());
+ streamLoadResponse = MAPPER.readValue(entityStr,
StreamLoadResponse.class);
+ logger.info("stream load response: " + entityStr);
+ if (streamLoadResponse == null) {
+ throw new StreamLoadException("stream load failed,
response is null, response: " + entityStr);
+ } else if (!streamLoadResponse.isSuccess()) {
+ throw new StreamLoadException(
+ "stream load failed, txnId: " +
streamLoadResponse.getTxnId()
+ + ", status: " +
streamLoadResponse.getStatus()
+ + ", msg: " +
streamLoadResponse.getMessage());
+ }
+ } catch (Exception e) {
+ logger.error("stream load exception", e);
+ unexpectedException = e;
+ currentThread.interrupt();
+ }
+ return streamLoadResponse;
+ });
}
@Override
public void close() throws IOException {
createNewBatch = true;
isFirstRecordOfBatch = true;
+ unexpectedException = null;
frontend.close();
if (backendHttpClient != null) {
backendHttpClient.close();
@@ -447,4 +472,16 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
return executor;
}
+ private void writeTo(byte[] bytes) throws Exception {
+ try {
+ output.write(bytes);
+ } catch (Exception e) {
+ if (unexpectedException != null) {
+ throw unexpectedException;
+ } else {
+ throw e;
+ }
+ }
+ }
+
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index a8d29ae..01dad06 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -65,7 +65,7 @@ public class DorisOptions {
public static final ConfigOption<String> DORIS_WRITE_FIELDS =
ConfigOptions.name("doris.write.fields").stringType().withoutDefaultValue().withDescription("");
- public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE =
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(100000).withDescription("");
+ public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE =
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(500000).withDescription("");
public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES =
ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription("");
public static final ConfigOption<Integer> DORIS_SINK_RETRY_INTERVAL_MS =
ConfigOptions.name("doris.sink.retry.interval.ms").intType().defaultValue(10000).withDescription("The
interval at which the Spark connector tries to load the batch of data again
after load fails.");
@@ -130,5 +130,7 @@ public class DorisOptions {
public static final ConfigOption<Boolean> DORIS_READ_BITMAP_TO_BASE64 =
ConfigOptions.name("doris.read.bitmap-to-base64").booleanType().defaultValue(false).withDescription("");
+ public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE =
ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 *
1024).withDescription("");
+
}
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
index dd73f53..9c38eed 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterFailoverITCase.scala
@@ -17,18 +17,21 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
import
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
getDorisQueryConnection}
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
import org.apache.doris.spark.rest.models.DataModel
+import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
-import org.junit.{Before, Test}
+import org.hamcrest.{CoreMatchers, Description, Matcher}
+import org.junit.rules.ExpectedException
+import org.junit.{Before, Rule, Test}
import org.slf4j.LoggerFactory
import java.util
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}
-import scala.util.control.Breaks._
import scala.collection.JavaConverters._
+import scala.util.control.Breaks._
/**
* Test DorisWriter failover.
@@ -41,6 +44,12 @@ class DorisWriterFailoverITCase extends
AbstractContainerTestBase {
val TABLE_WRITE_TBL_TASK_RETRY = "tbl_write_tbl_task_retry"
val TABLE_WRITE_TBL_PRECOMMIT_FAIL = "tbl_write_tbl_precommit_fail"
val TABLE_WRITE_TBL_COMMIT_FAIL = "tbl_write_tbl_commit_fail"
+ val TABLE_WRITE_TBL_FAIL_BEFORE_STOP = "tbl_write_tbl_fail_before_stop"
+
+ val _thrown: ExpectedException = ExpectedException.none
+
+ @Rule
+ def thrown: ExpectedException = _thrown
@Before
def setUp(): Unit = {
@@ -223,4 +232,30 @@ class DorisWriterFailoverITCase extends
AbstractContainerTestBase {
LOG.info("Checking DorisWriterFailoverITCase result. testName={},
actual={}, expected={}", testName, actual, expected)
assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
}
+
+ @Test
+ def testForWriteExceptionBeforeStop(): Unit = {
+ initializeTable(TABLE_WRITE_TBL_FAIL_BEFORE_STOP, DataModel.DUPLICATE)
+ val session = SparkSession.builder().master("local[1]").getOrCreate()
+ try {
+ val df = session.createDataFrame(Seq(
+ ("doris", "cn"),
+ ("spark", "us"),
+ ("catalog", "uk")
+ )).toDF("name", "address")
+ thrown.expect(classOf[SparkException])
+ df.write.format("doris")
+ .option("table.identifier", DATABASE + "." +
TABLE_WRITE_TBL_FAIL_BEFORE_STOP)
+ .option("fenodes", getFenodes)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
+ .option("doris.sink.properties.partial_columns", "true")
+ .option("doris.sink.net.buffer.size", "1")
+ .mode("append")
+ .save()
+ } finally {
+ session.stop()
+ }
+ }
+
}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
index b6edf3c..bc92a93 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala
@@ -28,12 +28,10 @@ import org.apache.spark.sql.types.StructType
import java.time.Duration
import java.util.concurrent.locks.LockSupport
import scala.collection.mutable
-import scala.util.{Failure, Random, Success}
+import scala.util.{Failure, Success}
class DorisDataWriter(config: DorisConfig, schema: StructType, partitionId:
Int, taskId: Long, epochId: Long = -1) extends DataWriter[InternalRow] with
Logging {
- private val batchSize = config.getValue(DorisOptions.DORIS_SINK_BATCH_SIZE)
-
private val (writer: DorisWriter[InternalRow], committer: DorisCommitter) =
config.getValue(DorisOptions.LOAD_MODE) match {
case "stream_load" => (new StreamLoadProcessor(config, schema), new
StreamLoadProcessor(config, schema))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]