This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new fe87bd1 Extract a common method for retrying functions in Utils (#89)
fe87bd1 is described below
commit fe87bd17d9180cc7578d5280ded5c308fa4bbefb
Author: Bowen Liang <[email protected]>
AuthorDate: Mon May 29 16:27:47 2023 +0800
Extract a common method for retrying functions in Utils (#89)
* extract retry method in Utils
* revert irrelevant changes
* revert unnecessary change
---
.../apache/doris/spark/rdd/ScalaValueReader.scala | 7 ++++
.../doris/spark/sql/DorisSourceProvider.scala | 48 +++++++---------------
.../doris/spark/sql/DorisStreamLoadSink.scala | 39 +++++-------------
.../scala/org/apache/doris/spark/sql/Utils.scala | 20 +++++++++
4 files changed, 51 insertions(+), 63 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index d8ecf8d..9c12cf7 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -24,6 +24,7 @@ import java.util.concurrent._
import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
import scala.collection.JavaConversions._
import scala.util.Try
+
import org.apache.doris.spark.backend.BackendClient
import org.apache.doris.spark.cfg.ConfigurationOptions._
import org.apache.doris.spark.cfg.Settings
@@ -36,10 +37,16 @@ import org.apache.doris.spark.util.ErrorMessages
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.spark.internal.Logging
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
+import scala.collection.JavaConversions._
+import scala.util.Try
import scala.util.control.Breaks
/**
* read data from Doris BE to array.
+ *
* @param partition Doris RDD partition
* @param settings request configuration
*/
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 671c4b8..e469f38 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -27,12 +27,13 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}
+
import java.io.IOException
+import java.time.Duration
import java.util
-import org.apache.doris.spark.rest.RestService
import java.util.Objects
import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.util.control.Breaks
+import scala.util.{Failure, Success}
private[sql] class DorisSourceProvider extends DataSourceRegister
with RelationProvider
@@ -86,49 +87,28 @@ private[sql] class DorisSourceProvider extends
DataSourceRegister
}
rowsBuffer.add(line)
if (rowsBuffer.size > maxRowCount - 1 ) {
- flush
+ flush()
}
})
// flush buffer
if (!rowsBuffer.isEmpty) {
- flush
+ flush()
}
/**
* flush data to Doris and do retry when flush error
*
*/
- def flush = {
- val loop = new Breaks
- var err: Exception = null
- loop.breakable {
-
- for (i <- 1 to maxRetryTimes) {
- try {
- dorisStreamLoader.loadV2(rowsBuffer)
- rowsBuffer.clear()
- Thread.sleep(batchInterValMs.longValue())
- loop.break()
- }
- catch {
- case e: Exception =>
- try {
- logger.debug("Failed to load data on BE: {} node ",
dorisStreamLoader.getLoadUrlStr)
- if (err == null) err = e
- Thread.sleep(1000 * i)
- } catch {
- case ex: InterruptedException =>
- Thread.currentThread.interrupt()
- throw new IOException("unable to flush; interrupted while
doing another attempt", e)
- }
- }
- }
-
- if (!rowsBuffer.isEmpty) {
- throw new IOException(s"Failed to load ${maxRowCount} batch data
on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max
${maxRetryTimes} retry times.", err)
- }
+ def flush(): Unit = {
+ Utils.retry[Unit, Exception](maxRetryTimes,
Duration.ofMillis(batchInterValMs.toLong), logger) {
+ dorisStreamLoader.loadV2(rowsBuffer)
+ rowsBuffer.clear()
+ } match {
+ case Success(_) =>
+ case Failure(e) =>
+ throw new IOException(
+ s"Failed to load $maxRowCount batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes}
retry times.", e)
}
-
}
})
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index e91e8fa..4644820 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -24,11 +24,12 @@ import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
-import collection.JavaConverters._
import java.io.IOException
+import java.time.Duration
import java.util
import java.util.Objects
-import scala.util.control.Breaks
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success}
private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings:
SparkSettings) extends Sink with Serializable {
@@ -69,33 +70,13 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
*
*/
def flush(batch: Iterable[util.List[Object]]): Unit = {
- val loop = new Breaks
- var err: Exception = null
- var loadSuccess: Boolean = false;
- loop.breakable {
- (1 to maxRetryTimes).foreach { i =>
- try {
- dorisStreamLoader.loadV2(batch.toList.asJava)
- loadSuccess = true
- Thread.sleep(batchInterValMs.longValue())
- loop.break()
- } catch {
- case e: Exception =>
- try {
- logger.debug("Failed to load data on BE: {} node ",
dorisStreamLoader.getLoadUrlStr)
- if (err == null) err = e
- Thread.sleep(1000 * i)
- } catch {
- case ex: InterruptedException =>
- Thread.currentThread.interrupt()
- throw new IOException("unable to flush; interrupted while
doing another attempt", ex)
- }
- }
- }
- // check load success, if not throw exception
- if (!loadSuccess) {
- throw new IOException(s"Failed to load batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes}
retry times.", err)
- }
+ Utils.retry[Unit, Exception](maxRetryTimes,
Duration.ofMillis(batchInterValMs.toLong), logger) {
+ dorisStreamLoader.loadV2(batch.toList.asJava)
+ } match {
+ case Success(_) =>
+ case Failure(e) =>
+ throw new IOException(
+ s"Failed to load batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes
retry times.", e)
}
}
}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index 18dd3b2..ba6fa86 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -25,6 +25,11 @@ import org.apache.spark.sql.sources._
import org.slf4j.Logger
import java.sql.{Date, Timestamp}
+import java.time.Duration
+import java.util.concurrent.locks.LockSupport
+import scala.annotation.tailrec
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
private[sql] object Utils {
/**
@@ -158,4 +163,19 @@ private[sql] object Utils {
finalParams
}
+
+ @tailrec
+ def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration,
logger: Logger)(f: => R): Try[R] = {
+ assert(retryTimes >= 0)
+ val result = Try(f)
+ result match {
+ case Success(result) => Success(result)
+ case Failure(exception: T) if retryTimes > 0 =>
+ logger.warn(s"Execution failed caused by: ", exception)
+ logger.warn(s"$retryTimes times retry remaining, the next will be in
${interval.toMillis}ms")
+ LockSupport.parkNanos(interval.toNanos)
+ retry(retryTimes - 1, interval, logger)(f)
+ case Failure(exception) => Failure(exception)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]