This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 4538a43 [refactor] Unified writing through DorisWriter (#104)
4538a43 is described below
commit 4538a430e3321be4949a9637f35bb163e5f1256f
Author: gnehil <[email protected]>
AuthorDate: Tue May 30 22:55:04 2023 +0800
[refactor] Unified writing through DorisWriter (#104)
* use writer to write data
* resolve conflicts
* unify jackson version
* remove useless code
---
spark-doris-connector/pom.xml | 2 +-
.../{ => load}/CachedDorisStreamLoadClient.java | 7 +--
.../doris/spark/{ => load}/DorisStreamLoad.java | 39 +++++++-------
.../doris/spark/sql/DorisSourceProvider.scala | 62 ++--------------------
.../doris/spark/sql/DorisStreamLoadSink.scala | 52 ++----------------
.../scala/org/apache/doris/spark/sql/Utils.scala | 6 ++-
.../DorisWriter.scala} | 54 +++++++++----------
7 files changed, 60 insertions(+), 162 deletions(-)
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 77b37c1..e4b4c8b 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -70,7 +70,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.scm.id>github</project.scm.id>
<netty.version>4.1.77.Final</netty.version>
- <fasterxml.jackson.version>2.13.3</fasterxml.jackson.version>
+ <fasterxml.jackson.version>2.10.5</fasterxml.jackson.version>
<thrift-service.version>1.0.0</thrift-service.version>
</properties>
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
similarity index 90%
rename from
spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
rename to
spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
index 1d89126..d3dab49 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
@@ -15,17 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.DorisException;
-
-import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
similarity index 96%
rename from
spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
rename to
spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 6738c09..61379e3 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -14,15 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark;
+package org.apache.doris.spark.load;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.StreamLoadException;
@@ -30,6 +23,14 @@ import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.RespContent;
import org.apache.doris.spark.util.ListUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -45,10 +46,17 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
-import java.sql.Date;
import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -73,13 +81,11 @@ public class DorisStreamLoad implements Serializable {
private String tbl;
private String authEncoded;
private String columns;
- private String[] dfColumns;
private String maxFilterRatio;
private Map<String, String> streamLoadProp;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fileType;
- private final SimpleDateFormat dateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
public DorisStreamLoad(SparkSettings settings) {
String[] dbTable =
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
@@ -101,11 +107,6 @@ public class DorisStreamLoad implements Serializable {
}
}
- public DorisStreamLoad(SparkSettings settings, String[] dfColumns) {
- this(settings);
- this.dfColumns = dfColumns;
- }
-
public String getLoadUrlStr() {
if (StringUtils.isEmpty(loadUrlStr)) {
return "";
@@ -168,7 +169,7 @@ public class DorisStreamLoad implements Serializable {
}
- public void loadV2(List<List<Object>> rows) throws StreamLoadException,
JsonProcessingException {
+ public void loadV2(List<List<Object>> rows, String[] dfColumns) throws
StreamLoadException, JsonProcessingException {
if (fileType.equals("csv")) {
load(listToString(rows));
} else if(fileType.equals("json")) {
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index e469f38..94fab9e 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,9 +17,9 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.DorisStreamLoad
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.cfg.SparkSettings
import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME
+import org.apache.doris.spark.writer.DorisWriter
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
@@ -28,12 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.util.{Failure, Success}
private[sql] class DorisSourceProvider extends DataSourceRegister
with RelationProvider
@@ -60,58 +55,9 @@ private[sql] class DorisSourceProvider extends
DataSourceRegister
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
sparkSettings.merge(Utils.params(parameters, logger).asJava)
// init stream loader
- val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns)
+ val writer = new DorisWriter(sparkSettings)
+ writer.write(data)
- val maxRowCount =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- val maxRetryTimes =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
- val sinkTaskPartitionSize =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
- val sinkTaskUseRepartition =
sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
- val batchInterValMs =
sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
-
- logger.info(s"maxRowCount ${maxRowCount}")
- logger.info(s"maxRetryTimes ${maxRetryTimes}")
- logger.info(s"batchInterVarMs ${batchInterValMs}")
-
- var resultRdd = data.rdd
- if (Objects.nonNull(sinkTaskPartitionSize)) {
- resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
- }
-
- resultRdd.foreachPartition(partition => {
- val rowsBuffer: util.List[util.List[Object]] = new
util.ArrayList[util.List[Object]](maxRowCount)
- partition.foreach(row => {
- val line: util.List[Object] = new util.ArrayList[Object]()
- for (i <- 0 until row.size) {
- val field = row.get(i)
- line.add(field.asInstanceOf[AnyRef])
- }
- rowsBuffer.add(line)
- if (rowsBuffer.size > maxRowCount - 1 ) {
- flush()
- }
- })
- // flush buffer
- if (!rowsBuffer.isEmpty) {
- flush()
- }
-
- /**
- * flush data to Doris and do retry when flush error
- *
- */
- def flush(): Unit = {
- Utils.retry[Unit, Exception](maxRetryTimes,
Duration.ofMillis(batchInterValMs.toLong), logger) {
- dorisStreamLoader.loadV2(rowsBuffer)
- rowsBuffer.clear()
- } match {
- case Success(_) =>
- case Failure(e) =>
- throw new IOException(
- s"Failed to load $maxRowCount batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes}
retry times.", e)
- }
- }
-
- })
new BaseRelation {
override def sqlContext: SQLContext = unsupportedException
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 4644820..342e940 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,69 +17,27 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.rdd.RDD
+import org.apache.doris.spark.cfg.SparkSettings
+import org.apache.doris.spark.writer.DorisWriter
import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
-import java.io.IOException
-import java.time.Duration
-import java.util
-import java.util.Objects
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success}
-
private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings:
SparkSettings) extends Sink with Serializable {
private val logger: Logger =
LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
@volatile private var latestBatchId = -1L
- val batchSize: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- val maxRetryTimes: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
- val sinkTaskPartitionSize =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
- val sinkTaskUseRepartition =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
- val batchInterValMs =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
- val dorisStreamLoader: DorisStreamLoad =
CachedDorisStreamLoadClient.getOrCreate(settings)
+ private val writer = new DorisWriter(settings)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
logger.info(s"Skipping already committed batch $batchId")
} else {
- write(data.rdd)
+ writer.write(data)
latestBatchId = batchId
}
}
- def write(rdd: RDD[Row]): Unit = {
- var resultRdd = rdd
- if (Objects.nonNull(sinkTaskPartitionSize)) {
- resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
- }
- resultRdd
- .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava)
- .foreachPartition(partition => {
- partition
- .grouped(batchSize)
- .foreach(batch => flush(batch))
- })
-
- /**
- * flush data to Doris and do retry when flush error
- *
- */
- def flush(batch: Iterable[util.List[Object]]): Unit = {
- Utils.retry[Unit, Exception](maxRetryTimes,
Duration.ofMillis(batchInterValMs.toLong), logger) {
- dorisStreamLoader.loadV2(batch.toList.asJava)
- } match {
- case Success(_) =>
- case Failure(e) =>
- throw new IOException(
- s"Failed to load batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes
retry times.", e)
- }
- }
- }
-
override def toString: String = "DorisStreamLoadSink"
}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
index ba6fa86..2f3a5bb 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala
@@ -31,7 +31,7 @@ import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
-private[sql] object Utils {
+private[spark] object Utils {
/**
* quote column name
* @param colName column name
@@ -169,7 +169,9 @@ private[sql] object Utils {
assert(retryTimes >= 0)
val result = Try(f)
result match {
- case Success(result) => Success(result)
+ case Success(result) =>
+ LockSupport.parkNanos(interval.toNanos)
+ Success(result)
case Failure(exception: T) if retryTimes > 0 =>
logger.warn(s"Execution failed caused by: ", exception)
logger.warn(s"$retryTimes times retry remaining, the next will be in
${interval.toMillis}ms")
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
similarity index 51%
copy from
spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
copy to
spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index 4644820..3839ff7 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -15,13 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spark.sql
+package org.apache.doris.spark.writer
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
-import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.doris.spark.load.{CachedDorisStreamLoadClient,
DorisStreamLoad}
+import org.apache.doris.spark.sql.Utils
+import org.apache.spark.sql.DataFrame
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
@@ -31,29 +30,25 @@ import java.util.Objects
import scala.collection.JavaConverters._
import scala.util.{Failure, Success}
-private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings:
SparkSettings) extends Sink with Serializable {
+class DorisWriter(settings: SparkSettings) extends Serializable {
- private val logger: Logger =
LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName)
- @volatile private var latestBatchId = -1L
- val batchSize: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
- val maxRetryTimes: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
- val sinkTaskPartitionSize =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
- val sinkTaskUseRepartition =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
- val batchInterValMs =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
+ private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])
- val dorisStreamLoader: DorisStreamLoad =
CachedDorisStreamLoadClient.getOrCreate(settings)
+ val batchSize: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
+ ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
+ private val maxRetryTimes: Int =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
+ ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
+ private val sinkTaskPartitionSize: Integer =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
+ private val sinkTaskUseRepartition: Boolean =
settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
+
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
+ private val batchInterValMs: Integer =
settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
+ ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
- override def addBatch(batchId: Long, data: DataFrame): Unit = {
- if (batchId <= latestBatchId) {
- logger.info(s"Skipping already committed batch $batchId")
- } else {
- write(data.rdd)
- latestBatchId = batchId
- }
- }
+ private val dorisStreamLoader: DorisStreamLoad =
CachedDorisStreamLoadClient.getOrCreate(settings)
- def write(rdd: RDD[Row]): Unit = {
- var resultRdd = rdd
+ def write(dataFrame: DataFrame): Unit = {
+ var resultRdd = dataFrame.rdd
+ val dfColumns = dataFrame.columns
if (Objects.nonNull(sinkTaskPartitionSize)) {
resultRdd = if (sinkTaskUseRepartition)
resultRdd.repartition(sinkTaskPartitionSize) else
resultRdd.coalesce(sinkTaskPartitionSize)
}
@@ -62,24 +57,25 @@ private[sql] class DorisStreamLoadSink(sqlContext:
SQLContext, settings: SparkSe
.foreachPartition(partition => {
partition
.grouped(batchSize)
- .foreach(batch => flush(batch))
+ .foreach(batch => flush(batch, dfColumns))
})
/**
* flush data to Doris and do retry when flush error
*
*/
- def flush(batch: Iterable[util.List[Object]]): Unit = {
+ def flush(batch: Iterable[util.List[Object]], dfColumns: Array[String]):
Unit = {
Utils.retry[Unit, Exception](maxRetryTimes,
Duration.ofMillis(batchInterValMs.toLong), logger) {
- dorisStreamLoader.loadV2(batch.toList.asJava)
+ dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns)
} match {
case Success(_) =>
case Failure(e) =>
throw new IOException(
- s"Failed to load batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes
retry times.", e)
+ s"Failed to load batch data on BE:
${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes}
retry times.", e)
}
}
+
}
- override def toString: String = "DorisStreamLoadSink"
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]