This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fe7ff3d0c [spark] Spark supports read changelog (#2107)
fe7ff3d0c is described below
commit fe7ff3d0c937f8b9d90ee0017e780545aa7bb6d4
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 10 18:27:44 2023 +0800
[spark] Spark supports read changelog (#2107)
---
docs/content/engines/spark3.md | 21 ++
.../generated/spark_connector_configuration.html | 6 +
.../java/org/apache/paimon/utils/TypeUtils.java | 9 +
.../apache/paimon/spark/SparkConnectorOptions.java | 7 +
.../java/org/apache/paimon/spark/SparkScan.java | 4 +-
.../org/apache/paimon/spark/SparkSource.scala | 16 +-
.../spark/sources/PaimonMicroBatchStream.scala | 6 +-
.../apache/paimon/spark/sources/StreamHelper.scala | 18 +-
.../apache/paimon/spark/PaimonCDCSourceTest.scala | 219 +++++++++++++++++++++
9 files changed, 288 insertions(+), 18 deletions(-)
diff --git a/docs/content/engines/spark3.md b/docs/content/engines/spark3.md
index e659d6dbf..e1c113e44 100644
--- a/docs/content/engines/spark3.md
+++ b/docs/content/engines/spark3.md
@@ -402,6 +402,27 @@ val query = spark.readStream
.start()
```
+Paimon Structured Streaming supports read row in the form of changelog (add
rowkind column in row to represent its
+change type) by setting `read.changelog` to true (default is false).
+
+**Example:**
+
+```scala
+// no any scan-related configs are provided, that will use latest-full scan
mode.
+val query = spark.readStream
+ .format("paimon")
+ .option("read.changelog", "true")
+ .load("/path/to/paimon/source/table")
+ .writeStream
+ .format("console")
+ .start()
+
+/*
++I 1 Hi
++I 2 Hello
+*/
+```
+
## Spark Type Conversion
This section lists all supported type conversion between Spark and Paimon.
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 9f2e6962e..2e6bf03e4 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -56,6 +56,12 @@ under the License.
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which
used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs
together.</td>
</tr>
+ <tr>
+ <td><h5>read.changelog</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to read row in the form of changelog (add rowkind
column in row to represent its change type).</td>
+ </tr>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 5e1cac68b..6ee72b0b4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -51,6 +51,15 @@ public class TypeUtils {
Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
}
+ public static RowType project(RowType inputType, List<String> names) {
+ List<DataField> fields = inputType.getFields();
+ List<String> fieldNames =
fields.stream().map(DataField::name).collect(Collectors.toList());
+ return new RowType(
+ names.stream()
+ .map(k -> fields.get(fieldNames.indexOf(k)))
+ .collect(Collectors.toList()));
+ }
+
public static Object castFromString(String s, DataType type) {
return castFromStringInternal(s, type, false);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 2f9e9297c..d2be72706 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -69,4 +69,11 @@ public class SparkConnectorOptions {
.noDefaultValue()
.withDescription(
"The maximum delay between two adjacent batches,
which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger
together.");
+
+ public static final ConfigOption<Boolean> READ_CHANGELOG =
+ key("read.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to read row in the form of changelog (add
rowkind column in row to represent its change type).");
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index e277834b5..04d959da6 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -19,7 +19,7 @@
package org.apache.paimon.spark;
import org.apache.paimon.spark.sources.PaimonMicroBatchStream;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
@@ -83,7 +83,7 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
- return new PaimonMicroBatchStream((FileStoreTable) table, readBuilder,
checkpointLocation);
+ return new PaimonMicroBatchStream((DataTable) table, readBuilder,
checkpointLocation);
}
protected List<Split> splits() {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 9495cb0fe..b78d56851 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -21,7 +21,8 @@ import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
-import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
+import org.apache.paimon.table.{DataTable, FileStoreTable,
FileStoreTableFactory}
+import org.apache.paimon.table.system.AuditLogTable
import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode,
SparkSession, SQLContext}
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
@@ -70,17 +71,22 @@ class SparkSource
mode: SparkSaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- val table = loadTable(parameters.asJava)
+ val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
WriteIntoPaimonTable(table, SaveMode.transform(mode), data,
Options.fromMap(parameters.asJava))
.run(sqlContext.sparkSession)
SparkSource.toBaseRelation(table, sqlContext)
}
- private def loadTable(options: JMap[String, String]): FileStoreTable = {
+ private def loadTable(options: JMap[String, String]): DataTable = {
val catalogContext = CatalogContext.create(
Options.fromMap(options),
SparkSession.active.sessionState.newHadoopConf())
- FileStoreTableFactory.create(catalogContext)
+ val table = FileStoreTableFactory.create(catalogContext)
+ if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
+ new AuditLogTable(table)
+ } else {
+ table
+ }
}
override def createSink(
@@ -91,7 +97,7 @@ class SparkSource
if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
throw new RuntimeException("Paimon supports only Complete and Append
output mode.")
}
- val table = loadTable(parameters.asJava)
+ val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
val options = Options.fromMap(parameters.asJava)
new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index f1f8f7e8c..702065a13 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -19,7 +19,7 @@ package org.apache.paimon.spark.sources
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions,
SparkInputPartition, SparkReaderFactory}
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.DataTable
import org.apache.paimon.table.source.ReadBuilder
import org.apache.spark.internal.Logging
@@ -29,7 +29,7 @@ import
org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset,
import scala.collection.mutable
class PaimonMicroBatchStream(
- originTable: FileStoreTable,
+ originTable: DataTable,
readBuilder: ReadBuilder,
checkpointLocation: String)
extends MicroBatchStream
@@ -144,6 +144,6 @@ class PaimonMicroBatchStream(
override def stop(): Unit = {}
- override def table: FileStoreTable = originTable
+ override def table: DataTable = originTable
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index 13afc81a4..ac7d0dc29 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -20,11 +20,11 @@ package org.apache.paimon.spark.sources
import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.spark.SparkTypeUtils
-import org.apache.paimon.spark.commands.WithFileStoreTable
-import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan,
ScanMode}
+import org.apache.paimon.table.DataTable
+import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
import org.apache.paimon.table.source.TableScan.Plan
import org.apache.paimon.table.source.snapshot.StartingContext
-import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils}
import org.apache.spark.sql.connector.read.streaming.ReadLimit
import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -35,7 +35,9 @@ import scala.collection.mutable
case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit)
-trait StreamHelper extends WithFileStoreTable {
+trait StreamHelper {
+
+ def table: DataTable
val initOffset: PaimonSourceOffset
@@ -44,12 +46,12 @@ trait StreamHelper extends WithFileStoreTable {
private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
private lazy val partitionSchema: StructType =
- SparkTypeUtils.fromPaimonRowType(table.schema().logicalPartitionType())
+ SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(),
table.partitionKeys()))
private lazy val partitionComputer: RowDataPartitionComputer = new
RowDataPartitionComputer(
- new CoreOptions(table.schema.options).partitionDefaultName,
- table.schema.logicalPartitionType,
- table.schema.partitionKeys.asScala.toArray
+ new CoreOptions(table.options).partitionDefaultName,
+ TypeUtils.project(table.rowType(), table.partitionKeys()),
+ table.partitionKeys().asScala.toArray
)
// Used to get the initial offset.
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
new file mode 100644
index 000000000..52a59c2da
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {
+
+ import testImplicits._
+
+ test("Paimon CDC Source: batch write and streaming read change-log with
default scan mode") {
+ withTempDir {
+ checkpointDir =>
+ val tableName = "T"
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ spark.sql(s"""
+ |CREATE TABLE $tableName (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'write-mode'='change-log',
+ | 'bucket'='2',
+ | 'changelog-producer' = 'lookup')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
+
+ val table = loadTable(tableName)
+ val location = table.location().getPath
+
+ val readStream = spark.readStream
+ .format("paimon")
+ .option("read.changelog", "true")
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ readStream.processAllAvailable()
+ val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2_new")
:: Nil
+ checkAnswer(currentResult(), expertResult1)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3,
'v_3')")
+ readStream.processAllAvailable()
+ val expertResult2 =
+ Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1,
"v_1_new") :: Row(
+ "+I",
+ 2,
+ "v_2_new") :: Row("+I", 3, "v_3") :: Nil
+ checkAnswer(currentResult(), expertResult2)
+ } finally {
+ readStream.stop()
+ }
+ }
+ }
+
+ test("Paimon CDC Source: batch write and streaming read change-log with
scan.snapshot-id") {
+ withTempDir {
+ checkpointDir =>
+ val tableName = "T"
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ spark.sql(s"""
+ |CREATE TABLE $tableName (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'write-mode'='change-log',
+ | 'bucket'='2',
+ | 'changelog-producer' = 'lookup')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
+ spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
+
+ val table = loadTable(tableName)
+ val location = table.location().getPath
+
+ val readStream = spark.readStream
+ .format("paimon")
+ .option("read.changelog", "true")
+ .option("scan.mode", "from-snapshot")
+ .option("scan.snapshot-id", 1)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ readStream.processAllAvailable()
+ val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") ::
Row(
+ "-U",
+ 2,
+ "v_2") :: Row("+U", 2, "v_2_new") :: Nil
+ checkAnswer(currentResult(), expertResult1)
+
+ spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3,
'v_3')")
+ readStream.processAllAvailable()
+ val expertResult2 =
+ Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1,
"v_1_new") :: Row(
+ "+I",
+ 2,
+ "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") ::
Row("+I", 3, "v_3") :: Nil
+ checkAnswer(currentResult(), expertResult2)
+ } finally {
+ readStream.stop()
+ }
+ }
+ }
+
+ test("Paimon CDC Source: streaming write and streaming read change-log") {
+ withTempDirs {
+ (checkpointDir1, checkpointDir2) =>
+ val tableName = "T"
+ spark.sql(s"DROP TABLE IF EXISTS $tableName")
+ spark.sql(s"""
+ |CREATE TABLE $tableName (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'write-mode'='change-log',
+ | 'bucket'='2',
+ | 'changelog-producer' = 'lookup')
+ |""".stripMargin)
+
+ val table = loadTable(tableName)
+ val location = table.location().getPath
+
+ // streaming write
+ val inputData = MemoryStream[(Int, String)]
+ val writeStream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ // streaming read
+ val readStream = spark.readStream
+ .format("paimon")
+ .option("read.changelog", "true")
+ .option("scan.mode", "from-snapshot")
+ .option("scan.snapshot-id", 1)
+ .load(location)
+ .writeStream
+ .format("memory")
+ .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+ .queryName("mem_table")
+ .outputMode("append")
+ .start()
+
+ val currentResult = () => spark.sql("SELECT * FROM mem_table")
+ try {
+ inputData.addData((1, "v_1"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult1 = Row("+I", 1, "v_1") :: Nil
+ checkAnswer(currentResult(), expertResult1)
+
+ inputData.addData((2, "v_2"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult2 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil
+ checkAnswer(currentResult(), expertResult2)
+
+ inputData.addData((2, "v_2_new"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult3 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") ::
Row(
+ "-U",
+ 2,
+ "v_2") :: Row("+U", 2, "v_2_new") :: Nil
+ checkAnswer(currentResult(), expertResult3)
+
+ inputData.addData((1, "v_1_new"), (3, "v_3"))
+ writeStream.processAllAvailable()
+ readStream.processAllAvailable()
+ val expertResult4 =
+ Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1,
"v_1_new") :: Row(
+ "+I",
+ 2,
+ "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") ::
Row("+I", 3, "v_3") :: Nil
+ checkAnswer(currentResult(), expertResult4)
+ } finally {
+ readStream.stop()
+ }
+ }
+ }
+
+}