This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5410651 Fix data loss due to internal retries (#145)
5410651 is described below
commit 5410651e3fdcdc03ce14c09ae1b11a75f4a773ad
Author: gnehil <[email protected]>
AuthorDate: Sun Oct 8 18:25:03 2023 +0800
Fix data loss due to internal retries (#145)
---
.../apache/doris/spark/load/DorisStreamLoad.java | 96 +++++++++++++++++++---
.../spark/listener/DorisTransactionListener.scala | 2 +-
.../apache/doris/spark/writer/DorisWriter.scala | 32 +++++---
3 files changed, 106 insertions(+), 24 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 0b506b0..c524a4c 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@@ -64,13 +65,14 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable {
- private static final String NULL_VALUE = "\\N";
private static final Logger LOG =
LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -97,7 +99,9 @@ public class DorisStreamLoad implements Serializable {
private final String LINE_DELIMITER;
private boolean streamingPassthrough = false;
private final Integer batchSize;
- private boolean enable2PC;
+ private final boolean enable2PC;
+ private final Integer txnRetries;
+ private final Integer txnIntervalMs;
public DorisStreamLoad(SparkSettings settings) {
String[] dbTable =
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
@@ -128,6 +132,10 @@ public class DorisStreamLoad implements Serializable {
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT);
this.enable2PC =
settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
+ this.txnRetries =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES,
+ ConfigurationOptions.DORIS_SINK_TXN_RETRIES_DEFAULT);
+ this.txnIntervalMs =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS,
+ ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS_DEFAULT);
}
public String getLoadUrlStr() {
@@ -202,7 +210,19 @@ public class DorisStreamLoad implements Serializable {
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
} catch (IOException e) {
- throw new RuntimeException(e);
+ if (enable2PC) {
+ int retries = txnRetries;
+ while (retries > 0) {
+ try {
+ abortByLabel(label);
+ retries = 0;
+ } catch (StreamLoadException ex) {
+
LockSupport.parkNanos(Duration.ofMillis(txnIntervalMs).toNanos());
+ retries--;
+ }
+ }
+ }
+ throw new StreamLoadException("load execute failed", e);
}
if (loadResponse.status != HttpStatus.SC_OK) {
@@ -274,22 +294,68 @@ public class DorisStreamLoad implements Serializable {
}
- public void abort(int txnId) throws StreamLoadException {
+ /**
+ * abort transaction by id
+ *
+ * @param txnId transaction id
+ * @throws StreamLoadException
+ */
+ public void abortById(int txnId) throws StreamLoadException {
LOG.info("start abort transaction {}.", txnId);
+ try {
+ doAbort(httpPut -> httpPut.setHeader("txn_id",
String.valueOf(txnId)));
+ } catch (StreamLoadException e) {
+ LOG.error("abort transaction by id: {} failed.", txnId);
+ throw e;
+ }
+
+ LOG.info("abort transaction {} succeed.", txnId);
+
+ }
+
+ /**
+ * abort transaction by label
+ *
+ * @param label label
+ * @throws StreamLoadException
+ */
+ public void abortByLabel(String label) throws StreamLoadException {
+
+ LOG.info("start abort transaction by label: {}.", label);
+
+ try {
+ doAbort(httpPut -> httpPut.setHeader("label", label));
+ } catch (StreamLoadException e) {
+ LOG.error("abort transaction by label: {} failed.", label);
+ throw e;
+ }
+
+ LOG.info("abort transaction by label {} succeed.", label);
+
+ }
+
+ /**
+ * execute abort
+ *
+ * @param putConsumer http put process function
+ * @throws StreamLoadException
+ */
+ private void doAbort(Consumer<HttpPut> putConsumer) throws
StreamLoadException {
+
try (CloseableHttpClient client = getHttpClient()) {
String abortUrl = String.format(abortUrlPattern, getBackend(), db,
tbl);
HttpPut httpPut = new HttpPut(abortUrl);
addCommonHeader(httpPut);
httpPut.setHeader("txn_operation", "abort");
- httpPut.setHeader("txn_id", String.valueOf(txnId));
+ putConsumer.accept(httpPut);
CloseableHttpResponse response = client.execute(httpPut);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200 || response.getEntity() == null) {
- LOG.warn("abort transaction response: " +
response.getStatusLine().toString());
- throw new StreamLoadException("Fail to abort transaction " +
txnId + " with url " + abortUrl);
+ LOG.error("abort transaction response: " +
response.getStatusLine().toString());
+ throw new IOException("Fail to abort transaction with url " +
abortUrl);
}
String loadResult = EntityUtils.toString(response.getEntity());
@@ -297,17 +363,16 @@ public class DorisStreamLoad implements Serializable {
});
if (!"Success".equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
- throw new StreamLoadException("try abort committed
transaction, " + "do you recover from old savepoint?");
+ throw new IOException("try abort committed transaction");
}
- LOG.warn("Fail to abort transaction. txnId: {}, error: {}",
txnId, res.get("msg"));
+ LOG.error("Fail to abort transaction. error: {}",
res.get("msg"));
+ throw new IOException(String.format("Fail to abort
transaction. error: %s", res.get("msg")));
}
} catch (IOException e) {
throw new StreamLoadException(e);
}
- LOG.info("abort transaction {} succeed.", txnId);
-
}
public Map<String, String> getStreamLoadProp(SparkSettings sparkSettings) {
@@ -386,12 +451,21 @@ public class DorisStreamLoad implements Serializable {
return hexData;
}
+ /**
+ * add common header to http request
+ *
+ * @param httpReq http request
+ */
private void addCommonHeader(HttpRequestBase httpReq) {
httpReq.setHeader(HttpHeaders.AUTHORIZATION, "Basic " + authEncoded);
httpReq.setHeader(HttpHeaders.EXPECT, "100-continue");
httpReq.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain;
charset=UTF-8");
}
+ /**
+ * handle stream sink data pass through
+ * if load format is json, set read_json_by_line to true and remove
strip_outer_array parameter
+ */
private void handleStreamPassThrough() {
if ("json".equalsIgnoreCase(fileType)) {
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
index 262ad19..e5991de 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala
@@ -67,7 +67,7 @@ class DorisTransactionListener(preCommittedTxnAcc:
CollectionAccumulator[Int], d
logger.info("job run failed, start aborting transactions")
txnIds.foreach(txnId =>
Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs),
logger) {
- dorisStreamLoad.abort(txnId)
+ dorisStreamLoad.abortById(txnId)
} match {
case Success(_) =>
case Failure(_) => failedTxnIds += txnId
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 3fdfb79..a8c414e 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -21,6 +21,7 @@ import org.apache.doris.spark.cfg.{ConfigurationOptions,
SparkSettings}
import org.apache.doris.spark.listener.DorisTransactionListener
import org.apache.doris.spark.load.{CachedDorisStreamLoadClient,
DorisStreamLoad}
import org.apache.doris.spark.sql.Utils
+import org.apache.spark.TaskContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
@@ -31,18 +32,15 @@ import java.io.IOException
import java.time.Duration
import java.util
import java.util.Objects
+import java.util.concurrent.locks.LockSupport
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
class DorisWriter(settings: SparkSettings) extends Serializable {
private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])
- val batchSize: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
- ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- private val maxRetryTimes: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
- ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
private val sinkTaskPartitionSize: Integer =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
private val sinkTaskUseRepartition: Boolean =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
@@ -50,7 +48,7 @@ class DorisWriter(settings: SparkSettings) extends
Serializable {
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
private val enable2PC: Boolean =
settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
- ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
+ ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT)
private val sinkTxnIntervalMs: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS_DEFAULT)
private val sinkTxnRetries: Integer =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES,
@@ -58,19 +56,28 @@ class DorisWriter(settings: SparkSettings) extends
Serializable {
private val dorisStreamLoader: DorisStreamLoad =
CachedDorisStreamLoadClient.getOrCreate(settings)
+ /**
+ * write data in batch mode
+ *
+ * @param dataFrame source dataframe
+ */
def write(dataFrame: DataFrame): Unit = {
doWrite(dataFrame, dorisStreamLoader.load)
}
+ /**
+ * write data in stream mode
+ *
+ * @param dataFrame source dataframe
+ */
def writeStream(dataFrame: DataFrame): Unit = {
doWrite(dataFrame, dorisStreamLoader.loadStream)
}
private def doWrite(dataFrame: DataFrame, loadFunc:
(util.Iterator[InternalRow], StructType) => Int): Unit = {
-
-
val sc = dataFrame.sqlContext.sparkContext
+ logger.info(s"applicationAttemptId:
${sc.applicationAttemptId.getOrElse(-1)}")
val preCommittedTxnAcc =
sc.collectionAccumulator[Int]("preCommittedTxnAcc")
if (enable2PC) {
sc.addSparkListener(new DorisTransactionListener(preCommittedTxnAcc,
dorisStreamLoader, sinkTxnIntervalMs, sinkTxnRetries))
@@ -82,17 +89,18 @@ class DorisWriter(settings: SparkSettings) extends
Serializable {
resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
}
resultRdd.foreachPartition(iterator => {
+ val intervalNanos = Duration.ofMillis(batchInterValMs.toLong).toNanos
while (iterator.hasNext) {
- // do load batch with retries
- Utils.retry[Int, Exception](maxRetryTimes,
Duration.ofMillis(batchInterValMs.toLong), logger) {
+ Try {
loadFunc(iterator.asJava, schema)
} match {
case Success(txnId) => if (enable2PC) handleLoadSuccess(txnId,
preCommittedTxnAcc)
case Failure(e) =>
if (enable2PC) handleLoadFailure(preCommittedTxnAcc)
throw new IOException(
- s"Failed to load batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes}
retry times.", e)
+ s"Failed to load batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node.", e)
}
+ LockSupport.parkNanos(intervalNanos)
}
})
@@ -113,7 +121,7 @@ class DorisWriter(settings: SparkSettings) extends
Serializable {
val abortFailedTxnIds = mutable.Buffer[Int]()
acc.value.asScala.foreach(txnId => {
Utils.retry[Unit, Exception](sinkTxnRetries,
Duration.ofMillis(sinkTxnIntervalMs), logger) {
- dorisStreamLoader.abort(txnId)
+ dorisStreamLoader.abortById(txnId)
} match {
case Success(_) =>
case Failure(_) => abortFailedTxnIds += txnId
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]