This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 86de79e [improvement] json format support json by line mode (#120)
86de79e is described below
commit 86de79ef8a59e49de99edc86ca85b7ac9c949bcb
Author: gnehil <[email protected]>
AuthorDate: Wed Jul 26 17:21:44 2023 +0800
[improvement] json format support json by line mode (#120)
---
.../apache/doris/spark/load/DorisStreamLoad.java | 19 +++++++------
.../org/apache/doris/spark/util/ListUtils.java | 33 ++++++++++++++++++----
.../org/apache/doris/spark/util/TestListUtils.java | 4 +--
.../doris/spark/sql/TestConnectorWriteDoris.scala | 23 +++++++++++++++
4 files changed, 63 insertions(+), 16 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 07e6624..e1c1bc1 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -69,7 +69,7 @@ import java.util.stream.Collectors;
**/
public class DorisStreamLoad implements Serializable {
private String FIELD_DELIMITER;
- private String LINE_DELIMITER;
+ private final String LINE_DELIMITER;
private static final String NULL_VALUE = "\\N";
private static final Logger LOG =
LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -89,6 +89,8 @@ public class DorisStreamLoad implements Serializable {
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fileType;
+ private boolean readJsonByLine = false;
+
public DorisStreamLoad(SparkSettings settings) {
String[] dbTable =
settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
@@ -105,6 +107,12 @@ public class DorisStreamLoad implements Serializable {
fileType = streamLoadProp.getOrDefault("format", "csv");
if ("csv".equals(fileType)){
FIELD_DELIMITER =
EscapeHandler.escapeString(streamLoadProp.getOrDefault("column_separator",
"\t"));
+ } else if ("json".equalsIgnoreCase(fileType)) {
+ readJsonByLine =
Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false"));
+ boolean stripOuterArray =
Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false"));
+ if (readJsonByLine && stripOuterArray) {
+ throw new IllegalArgumentException("Only one of options
'read_json_by_line' and 'strip_outer_array' can be set to true");
+ }
}
LINE_DELIMITER =
EscapeHandler.escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
}
@@ -134,12 +142,7 @@ public class DorisStreamLoad implements Serializable {
httpPut.setHeader("max_filter_ratio", maxFilterRatio);
}
if (MapUtils.isNotEmpty(streamLoadProp)) {
- streamLoadProp.entrySet().stream()
- .filter(entry ->
!"read_json_by_line".equals(entry.getKey()))
- .forEach(entry -> httpPut.setHeader(entry.getKey(),
entry.getValue()));
- }
- if (fileType.equals("json")) {
- httpPut.setHeader("strip_outer_array", "true");
+ streamLoadProp.forEach(httpPut::setHeader);
}
return httpPut;
}
@@ -195,7 +198,7 @@ public class DorisStreamLoad implements Serializable {
throw new StreamLoadException("The number of configured
columns does not match the number of data columns.");
}
// splits large collections to normal collection to avoid the
"Requested array size exceeds VM limit" exception
- List<String> serializedList =
ListUtils.getSerializedList(dataList);
+ List<String> serializedList =
ListUtils.getSerializedList(dataList, readJsonByLine ? LINE_DELIMITER : null);
for (String serializedRows : serializedList) {
load(serializedRows);
}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
index 46a37ff..d8d31b9 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ListUtils.java
@@ -24,7 +24,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -34,9 +33,10 @@ public class ListUtils {
private static final ObjectMapper MAPPER = new ObjectMapper();
- public static List<String> getSerializedList(List<Map<Object, Object>>
batch) throws JsonProcessingException {
+ public static List<String> getSerializedList(List<Map<Object, Object>>
batch,
+ String lineDelimiter) throws
JsonProcessingException {
List<String> result = new ArrayList<>();
- divideAndSerialize(batch, result);
+ divideAndSerialize(batch, result, lineDelimiter);
return result;
}
@@ -46,8 +46,9 @@ public class ListUtils {
* @param result
* @throws JsonProcessingException
*/
- public static void divideAndSerialize(List<Map<Object, Object>> batch,
List<String> result) throws JsonProcessingException {
- String serializedResult = MAPPER.writeValueAsString(batch);
+ public static void divideAndSerialize(List<Map<Object, Object>> batch,
List<String> result, String lineDelimiter)
+ throws JsonProcessingException {
+ String serializedResult = generateSerializedResult(batch,
lineDelimiter);
// if an error occurred in the batch call to getBytes ,average divide
the batch
try {
//the "Requested array size exceeds VM limit" exception occurs
when the collection is large
@@ -58,7 +59,7 @@ public class ListUtils {
LOG.error("getBytes error:{} ,average divide the collection",
ExceptionUtils.getStackTrace(error));
}
for (List<Map<Object, Object>> avgSubCollection :
getAvgSubCollections(batch)) {
- divideAndSerialize(avgSubCollection, result);
+ divideAndSerialize(avgSubCollection, result, lineDelimiter);
}
}
@@ -70,4 +71,24 @@ public class ListUtils {
public static List<List<Map<Object, Object>>>
getAvgSubCollections(List<Map<Object, Object>> values) {
return Lists.partition(values, (values.size() + 1) / 2);
}
+
+ private static String generateSerializedResult(List<Map<Object, Object>>
batch, String lineDelimiter)
+ throws JsonProcessingException {
+
+ // when lineDelimiter is null, use strip_outer_array mode, otherwise
use json_by_line mode
+ if (lineDelimiter == null) {
+ return MAPPER.writeValueAsString(batch);
+ } else {
+ StringBuilder builder = new StringBuilder();
+ for (Map<Object, Object> data : batch) {
+
builder.append(MAPPER.writeValueAsString(data)).append(lineDelimiter);
+ }
+ int lastIdx = builder.lastIndexOf(lineDelimiter);
+ if (lastIdx != -1) {
+ return builder.substring(0, lastIdx);
+ }
+ return builder.toString();
+ }
+ }
+
}
diff --git
a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
index c0ec102..4e36418 100644
---
a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
+++
b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java
@@ -34,9 +34,9 @@ public class TestListUtils {
Map<Object, Object> entity = new HashMap<>();
batch.add(entity);
}
- Assert.assertEquals(ListUtils.getSerializedList(batch).size(), 1);
+ Assert.assertEquals(ListUtils.getSerializedList(batch, "\n").size(),
1);
- Assert.assertEquals(ListUtils.getSerializedList(new
ArrayList<>()).size(), 1);
+ Assert.assertEquals(ListUtils.getSerializedList(new ArrayList<>(),
"\n").size(), 1);
}
}
diff --git
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
index 26f89af..ae3b066 100644
---
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
+++
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
@@ -108,4 +108,27 @@ class TestConnectorWriteDoris {
.start().awaitTermination()
}
+ @Test
+ def jsonWriteTest(): Unit = {
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = spark.createDataFrame(Seq(
+ ("1", 100, "待付款"),
+ ("2", 200, "待发货"),
+ ("3", 300, "已收货")
+ )).toDF("order_id", "order_amount", "order_status")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", dorisFeNodes)
+ .option("doris.table.identifier", dorisTable)
+ .option("user", dorisUser)
+ .option("password", dorisPwd)
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
+ .option("sink.properties.format", "json")
+ // .option("sink.properties.read_json_by_line", "true")
+ .option("sink.properties.strip_outer_array", "true")
+ .save()
+ spark.stop()
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]