This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 24c2fd4 [Improve] Use streaming style on grouping, transforming and
flushing RDD partitions (#82)
24c2fd4 is described below
commit 24c2fd4c33944f3802bbae4b81658b4d2d1fcaaf
Author: Bowen Liang <[email protected]>
AuthorDate: Wed Mar 29 10:58:33 2023 +0800
[Improve] Use streaming style on grouping, transforming and flushing RDD
partitions (#82)
* loop partition in scala style
* update
---
.../doris/spark/sql/DorisStreamLoadSink.scala | 87 +++++++++-------------
1 file changed, 35 insertions(+), 52 deletions(-)
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index fea58ac..4796e4b 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -18,12 +18,14 @@
package org.apache.doris.spark.sql
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.sql.DorisWriterOptionKeys.maxRowCount
import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
+import collection.JavaConverters._
import java.io.IOException
import java.util
import java.util.Objects
@@ -33,7 +35,7 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
private val logger: Logger =
LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
@volatile private var latestBatchId = -1L
- val maxRowCount: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+ val batchSize: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
val sinkTaskPartitionSize =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
val sinkTaskUseRepartition =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
@@ -55,62 +57,43 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
if (Objects.nonNull(sinkTaskPartitionSize)) {
resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
}
- // write for each partition
- resultRdd.foreachPartition(partition => {
- val rowsBuffer: util.List[util.List[Object]] = new
util.ArrayList[util.List[Object]](maxRowCount)
- partition.foreach(row => {
- val line: util.List[Object] = new util.ArrayList[Object]()
- for (i <- 0 until row.size) {
- val field = row.get(i)
- line.add(field.asInstanceOf[AnyRef])
- }
- rowsBuffer.add(line)
- if (rowsBuffer.size > maxRowCount - 1) {
- flush
- }
+ resultRdd
+ .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
+ .foreachPartition(partition => {
+ partition
+ .grouped(batchSize)
+ .foreach(batch => flush(batch))
})
- // flush buffer
- if (!rowsBuffer.isEmpty) {
- flush
- }
-
- /**
- * flush data to Doris and do retry when flush error
- *
- */
- def flush(): Unit = {
- val loop = new Breaks
- var err: Exception = null
- loop.breakable {
- for (i <- 1 to maxRetryTimes) {
- try {
- dorisStreamLoader.loadV2(rowsBuffer)
- rowsBuffer.clear()
- Thread.sleep(batchInterValMs.longValue())
- loop.break()
- }
- catch {
- case e: Exception =>
- try {
- logger.debug("Failed to load data on BE: {} node ",
dorisStreamLoader.getLoadUrlStr)
- if (err == null) err = e
- Thread.sleep(1000 * i)
- } catch {
- case ex: InterruptedException =>
- Thread.currentThread.interrupt()
- throw new IOException("unable to flush; interrupted while
doing another attempt", ex)
- }
- }
- }
-
- if (!rowsBuffer.isEmpty) {
- throw new IOException(s"Failed to load ${maxRowCount} batch data
on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max
${maxRetryTimes} retry times.", err)
+ /**
+ * flush data to Doris and do retry when flush error
+ *
+ */
+ def flush(batch: Iterable[util.List[Object]]): Unit = {
+ val loop = new Breaks
+ var err: Exception = null
+ loop.breakable {
+ (1 to maxRetryTimes).foreach { i =>
+ try {
+ dorisStreamLoader.loadV2(batch.toList.asJava)
+ Thread.sleep(batchInterValMs.longValue())
+ loop.break()
+ } catch {
+ case e: Exception =>
+ try {
+ logger.debug("Failed to load data on BE: {} node ",
dorisStreamLoader.getLoadUrlStr)
+ if (err == null) err = e
+ Thread.sleep(1000 * i)
+ } catch {
+ case ex: InterruptedException =>
+ Thread.currentThread.interrupt()
+ throw new IOException("unable to flush; interrupted while
doing another attempt", ex)
+ }
}
+ throw new IOException(s"Failed to load $maxRowCount batch data on
BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max
${maxRetryTimes} retry times.", err)
}
-
}
- })
+ }
}
override def toString: String = "DorisStreamLoadSink"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]