This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 38c2718 [fix] The result of serialization of decimal type does not
meet the expected problem (#155)
38c2718 is described below
commit 38c2718f44af9d13e41b59622b5ccac8a03f413e
Author: gnehil <[email protected]>
AuthorDate: Tue Nov 7 10:54:56 2023 +0800
[fix] The result of serialization of decimal type does not meet the
expected problem (#155)
---
.../src/main/java/org/apache/doris/spark/util/DataUtil.java | 10 +++++-----
.../main/scala/org/apache/doris/spark/sql/SchemaUtils.scala | 2 +-
.../main/scala/org/apache/doris/spark/writer/DorisWriter.scala | 4 ++++
3 files changed, 10 insertions(+), 6 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index 3f53d45..530657e 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -17,10 +17,11 @@
package org.apache.doris.spark.util;
-import org.apache.doris.spark.sql.SchemaUtils;
-
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.module.scala.DefaultScalaModule;
+import org.apache.doris.spark.sql.SchemaUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -31,7 +32,7 @@ import java.util.Map;
public class DataUtil {
- private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectMapper MAPPER =
JsonMapper.builder().addModule(new DefaultScalaModule()).build();
public static final String NULL_VALUE = "\\N";
@@ -67,8 +68,7 @@ public class DataUtil {
return builder.toString().getBytes(StandardCharsets.UTF_8);
}
- public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
- throws JsonProcessingException {
+ public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
throws JsonProcessingException {
StructField[] fields = schema.fields();
Map<String, Object> rowMap = new HashMap<>(row.numFields());
for (int i = 0; i < fields.length; i++) {
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 982e580..1f0e942 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -168,7 +168,7 @@ private[spark] object SchemaUtils {
new Timestamp(row.getLong(ordinal) / 1000).toString
case DateType => DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString
case BinaryType => row.getBinary(ordinal)
- case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale)
+ case dt: DecimalType => row.getDecimal(ordinal, dt.precision,
dt.scale).toJavaBigDecimal
case at: ArrayType =>
val arrayData = row.getArray(ordinal)
if (arrayData == null) DataUtil.NULL_VALUE
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index f90bcc6..55f4d73 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -79,6 +79,10 @@ class DorisWriter(settings: SparkSettings) extends
Serializable {
* @param dataFrame source dataframe
*/
def writeStream(dataFrame: DataFrame): Unit = {
+ if (enable2PC) {
+ val errMsg = "two phrase commit is not supported in stream mode, please
set doris.sink.enable-2pc to false."
+ throw new UnsupportedOperationException(errMsg)
+ }
doWrite(dataFrame, dorisStreamLoader.loadStream)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]