This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new af7f6e01 [Api-draft] Fix Spark Sink Can't support batch mode. (#1868)
af7f6e01 is described below
commit af7f6e019a1d538e14de331e567d41da30b1ff97
Author: TrickyZerg <[email protected]>
AuthorDate: Fri May 13 17:31:13 2022 +0800
[Api-draft] Fix Spark Sink Can't support batch mode. (#1868)
* Fix Spark Sink Can't support batch mode.
---
.../seatunnel/api/serialization/DefaultSerializer.java | 14 ++++++++------
.../apache/seatunnel/common/utils/SerializationUtils.java | 9 +++++++++
.../translation/spark/sink/SparkSinkInjector.java | 13 ++++++++++---
3 files changed, 27 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
index ec213c69..c17a1d58 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DefaultSerializer.java
@@ -17,18 +17,20 @@
package org.apache.seatunnel.api.serialization;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
+import java.io.Serializable;
-public class DefaultSerializer implements Serializer<String> {
+public class DefaultSerializer<T extends Serializable> implements
Serializer<T> {
@Override
- public byte[] serialize(String obj) throws IOException {
- return obj.getBytes(StandardCharsets.UTF_8);
+ public byte[] serialize(T obj) throws IOException {
+ return SerializationUtils.serialize(obj);
}
@Override
- public String deserialize(byte[] serialized) throws IOException {
- return new String(serialized);
+ public T deserialize(byte[] serialized) throws IOException {
+ return SerializationUtils.deserialize(serialized);
}
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 0b6e9c52..43dead16 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -37,4 +37,13 @@ public class SerializationUtils {
}
return null;
}
+
+ public static <T extends Serializable> byte[] serialize(T obj) {
+ return org.apache.commons.lang3.SerializationUtils.serialize(obj);
+ }
+
+ public static <T extends Serializable> T deserialize(byte[] bytes) {
+ return org.apache.commons.lang3.SerializationUtils.deserialize(bytes);
+ }
+
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
index 8aeb4537..cb45b70d 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.translation.spark.sink;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
@@ -31,9 +31,16 @@ public class SparkSinkInjector {
private static final String SPARK_SINK_CLASS_NAME =
"org.apache.seatunnel.translation.spark.sink.SparkSink";
- public static DataStreamWriter<Row> inject(Dataset<Row> dataset,
SeaTunnelSink<?, ?, ?, ?> sink,
+ public static DataStreamWriter<Row> inject(DataStreamWriter<Row> dataset,
SeaTunnelSink<?, ?, ?, ?> sink,
HashMap<String, String>
configuration) {
- return
dataset.writeStream().format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+ return
dataset.format(SPARK_SINK_CLASS_NAME).outputMode(OutputMode.Append())
+ .option("configuration",
SerializationUtils.objectToString(configuration)).option("sink",
+ SerializationUtils.objectToString(sink));
+ }
+
+ public static DataFrameWriter<Row> inject(DataFrameWriter<Row> dataset,
SeaTunnelSink<?, ?, ?, ?> sink,
+ HashMap<String, String>
configuration) {
+ return dataset.format(SPARK_SINK_CLASS_NAME)
.option("configuration",
SerializationUtils.objectToString(configuration)).option("sink",
SerializationUtils.objectToString(sink));
}