This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fe7ff3d0c [spark] Spark supports read changelog (#2107)
fe7ff3d0c is described below

commit fe7ff3d0c937f8b9d90ee0017e780545aa7bb6d4
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 10 18:27:44 2023 +0800

    [spark] Spark supports read changelog (#2107)
---
 docs/content/engines/spark3.md                     |  21 ++
 .../generated/spark_connector_configuration.html   |   6 +
 .../java/org/apache/paimon/utils/TypeUtils.java    |   9 +
 .../apache/paimon/spark/SparkConnectorOptions.java |   7 +
 .../java/org/apache/paimon/spark/SparkScan.java    |   4 +-
 .../org/apache/paimon/spark/SparkSource.scala      |  16 +-
 .../spark/sources/PaimonMicroBatchStream.scala     |   6 +-
 .../apache/paimon/spark/sources/StreamHelper.scala |  18 +-
 .../apache/paimon/spark/PaimonCDCSourceTest.scala  | 219 +++++++++++++++++++++
 9 files changed, 288 insertions(+), 18 deletions(-)

diff --git a/docs/content/engines/spark3.md b/docs/content/engines/spark3.md
index e659d6dbf..e1c113e44 100644
--- a/docs/content/engines/spark3.md
+++ b/docs/content/engines/spark3.md
@@ -402,6 +402,27 @@ val query = spark.readStream
   .start()
 ```
 
+Paimon Structured Streaming supports read row in the form of changelog (add 
rowkind column in row to represent its 
+change type) by setting `read.changelog` to true (default is false).
+
+**Example:**
+
+```scala
+// no any scan-related configs are provided, that will use latest-full scan 
mode.
+val query = spark.readStream
+  .format("paimon")
+  .option("read.changelog", "true")
+  .load("/path/to/paimon/source/table")
+  .writeStream
+  .format("console")
+  .start()
+
+/*
++I   1  Hi
++I   2  Hello
+*/
+```
+
 ## Spark Type Conversion
 
 This section lists all supported type conversion between Spark and Paimon.
diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 9f2e6962e..2e6bf03e4 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -56,6 +56,12 @@ under the License.
             <td>Long</td>
             <td>The minimum number of rows returned in a single batch, which 
used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs 
together.</td>
         </tr>
+        <tr>
+            <td><h5>read.changelog</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to read row in the form of changelog (add rowkind 
column in row to represent its change type).</td>
+        </tr>
         <tr>
             <td><h5>write.merge-schema</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 5e1cac68b..6ee72b0b4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -51,6 +51,15 @@ public class TypeUtils {
                 
Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
     }
 
+    public static RowType project(RowType inputType, List<String> names) {
+        List<DataField> fields = inputType.getFields();
+        List<String> fieldNames = 
fields.stream().map(DataField::name).collect(Collectors.toList());
+        return new RowType(
+                names.stream()
+                        .map(k -> fields.get(fieldNames.indexOf(k)))
+                        .collect(Collectors.toList()));
+    }
+
     public static Object castFromString(String s, DataType type) {
         return castFromStringInternal(s, type, false);
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 2f9e9297c..d2be72706 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -69,4 +69,11 @@ public class SparkConnectorOptions {
                     .noDefaultValue()
                     .withDescription(
                             "The maximum delay between two adjacent batches, 
which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger 
together.");
+
+    public static final ConfigOption<Boolean> READ_CHANGELOG =
+            key("read.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to read row in the form of changelog (add 
rowkind column in row to represent its change type).");
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index e277834b5..04d959da6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.spark.sources.PaimonMicroBatchStream;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
@@ -83,7 +83,7 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
 
     @Override
     public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
-        return new PaimonMicroBatchStream((FileStoreTable) table, readBuilder, 
checkpointLocation);
+        return new PaimonMicroBatchStream((DataTable) table, readBuilder, 
checkpointLocation);
     }
 
     protected List<Split> splits() {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index 9495cb0fe..b78d56851 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -21,7 +21,8 @@ import org.apache.paimon.catalog.CatalogContext
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.spark.sources.PaimonSink
-import org.apache.paimon.table.{FileStoreTable, FileStoreTableFactory}
+import org.apache.paimon.table.{DataTable, FileStoreTable, 
FileStoreTableFactory}
+import org.apache.paimon.table.system.AuditLogTable
 
 import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, 
SparkSession, SQLContext}
 import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table}
@@ -70,17 +71,22 @@ class SparkSource
       mode: SparkSaveMode,
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
-    val table = loadTable(parameters.asJava)
+    val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
     WriteIntoPaimonTable(table, SaveMode.transform(mode), data, 
Options.fromMap(parameters.asJava))
       .run(sqlContext.sparkSession)
     SparkSource.toBaseRelation(table, sqlContext)
   }
 
-  private def loadTable(options: JMap[String, String]): FileStoreTable = {
+  private def loadTable(options: JMap[String, String]): DataTable = {
     val catalogContext = CatalogContext.create(
       Options.fromMap(options),
       SparkSession.active.sessionState.newHadoopConf())
-    FileStoreTableFactory.create(catalogContext)
+    val table = FileStoreTableFactory.create(catalogContext)
+    if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
+      new AuditLogTable(table)
+    } else {
+      table
+    }
   }
 
   override def createSink(
@@ -91,7 +97,7 @@ class SparkSource
     if (outputMode != OutputMode.Append && outputMode != OutputMode.Complete) {
       throw new RuntimeException("Paimon supports only Complete and Append 
output mode.")
     }
-    val table = loadTable(parameters.asJava)
+    val table = loadTable(parameters.asJava).asInstanceOf[FileStoreTable]
     val options = Options.fromMap(parameters.asJava)
     new PaimonSink(sqlContext, table, partitionColumns, outputMode, options)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
index f1f8f7e8c..702065a13 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala
@@ -19,7 +19,7 @@ package org.apache.paimon.spark.sources
 
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions, 
SparkInputPartition, SparkReaderFactory}
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.DataTable
 import org.apache.paimon.table.source.ReadBuilder
 
 import org.apache.spark.internal.Logging
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset,
 import scala.collection.mutable
 
 class PaimonMicroBatchStream(
-    originTable: FileStoreTable,
+    originTable: DataTable,
     readBuilder: ReadBuilder,
     checkpointLocation: String)
   extends MicroBatchStream
@@ -144,6 +144,6 @@ class PaimonMicroBatchStream(
 
   override def stop(): Unit = {}
 
-  override def table: FileStoreTable = originTable
+  override def table: DataTable = originTable
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index 13afc81a4..ac7d0dc29 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -20,11 +20,11 @@ package org.apache.paimon.spark.sources
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.spark.SparkTypeUtils
-import org.apache.paimon.spark.commands.WithFileStoreTable
-import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan, 
ScanMode}
+import org.apache.paimon.table.DataTable
+import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
 import org.apache.paimon.table.source.TableScan.Plan
 import org.apache.paimon.table.source.snapshot.StartingContext
-import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils}
 
 import org.apache.spark.sql.connector.read.streaming.ReadLimit
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -35,7 +35,9 @@ import scala.collection.mutable
 
 case class IndexedDataSplit(snapshotId: Long, index: Long, entry: DataSplit)
 
-trait StreamHelper extends WithFileStoreTable {
+trait StreamHelper {
+
+  def table: DataTable
 
   val initOffset: PaimonSourceOffset
 
@@ -44,12 +46,12 @@ trait StreamHelper extends WithFileStoreTable {
   private lazy val streamScan: InnerStreamTableScan = table.newStreamScan()
 
   private lazy val partitionSchema: StructType =
-    SparkTypeUtils.fromPaimonRowType(table.schema().logicalPartitionType())
+    SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), 
table.partitionKeys()))
 
   private lazy val partitionComputer: RowDataPartitionComputer = new 
RowDataPartitionComputer(
-    new CoreOptions(table.schema.options).partitionDefaultName,
-    table.schema.logicalPartitionType,
-    table.schema.partitionKeys.asScala.toArray
+    new CoreOptions(table.options).partitionDefaultName,
+    TypeUtils.project(table.rowType(), table.partitionKeys()),
+    table.partitionKeys().asScala.toArray
   )
 
   // Used to get the initial offset.
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
new file mode 100644
index 000000000..52a59c2da
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+class PaimonCDCSourceTest extends PaimonSparkTestBase with StreamTest {
+
+  import testImplicits._
+
+  test("Paimon CDC Source: batch write and streaming read change-log with 
default scan mode") {
+    withTempDir {
+      checkpointDir =>
+        val tableName = "T"
+        spark.sql(s"DROP TABLE IF EXISTS $tableName")
+        spark.sql(s"""
+                     |CREATE TABLE $tableName (a INT, b STRING)
+                     |TBLPROPERTIES (
+                     |  'primary-key'='a',
+                     |  'write-mode'='change-log',
+                     |  'bucket'='2',
+                     |  'changelog-producer' = 'lookup')
+                     |""".stripMargin)
+
+        spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
+        spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
+        spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
+
+        val table = loadTable(tableName)
+        val location = table.location().getPath
+
+        val readStream = spark.readStream
+          .format("paimon")
+          .option("read.changelog", "true")
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          readStream.processAllAvailable()
+          val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2_new") 
:: Nil
+          checkAnswer(currentResult(), expertResult1)
+
+          spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 
'v_3')")
+          readStream.processAllAvailable()
+          val expertResult2 =
+            Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, 
"v_1_new") :: Row(
+              "+I",
+              2,
+              "v_2_new") :: Row("+I", 3, "v_3") :: Nil
+          checkAnswer(currentResult(), expertResult2)
+        } finally {
+          readStream.stop()
+        }
+    }
+  }
+
+  test("Paimon CDC Source: batch write and streaming read change-log with 
scan.snapshot-id") {
+    withTempDir {
+      checkpointDir =>
+        val tableName = "T"
+        spark.sql(s"DROP TABLE IF EXISTS $tableName")
+        spark.sql(s"""
+                     |CREATE TABLE $tableName (a INT, b STRING)
+                     |TBLPROPERTIES (
+                     |  'primary-key'='a',
+                     |  'write-mode'='change-log',
+                     |  'bucket'='2',
+                     |  'changelog-producer' = 'lookup')
+                     |""".stripMargin)
+
+        spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1')")
+        spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2')")
+        spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
+
+        val table = loadTable(tableName)
+        val location = table.location().getPath
+
+        val readStream = spark.readStream
+          .format("paimon")
+          .option("read.changelog", "true")
+          .option("scan.mode", "from-snapshot")
+          .option("scan.snapshot-id", 1)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          readStream.processAllAvailable()
+          val expertResult1 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: 
Row(
+            "-U",
+            2,
+            "v_2") :: Row("+U", 2, "v_2_new") :: Nil
+          checkAnswer(currentResult(), expertResult1)
+
+          spark.sql(s"INSERT INTO $tableName VALUES (1, 'v_1_new'), (3, 
'v_3')")
+          readStream.processAllAvailable()
+          val expertResult2 =
+            Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, 
"v_1_new") :: Row(
+              "+I",
+              2,
+              "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: 
Row("+I", 3, "v_3") :: Nil
+          checkAnswer(currentResult(), expertResult2)
+        } finally {
+          readStream.stop()
+        }
+    }
+  }
+
+  test("Paimon CDC Source: streaming write and streaming read change-log") {
+    withTempDirs {
+      (checkpointDir1, checkpointDir2) =>
+        val tableName = "T"
+        spark.sql(s"DROP TABLE IF EXISTS $tableName")
+        spark.sql(s"""
+                     |CREATE TABLE $tableName (a INT, b STRING)
+                     |TBLPROPERTIES (
+                     |  'primary-key'='a',
+                     |  'write-mode'='change-log',
+                     |  'bucket'='2',
+                     |  'changelog-producer' = 'lookup')
+                     |""".stripMargin)
+
+        val table = loadTable(tableName)
+        val location = table.location().getPath
+
+        // streaming write
+        val inputData = MemoryStream[(Int, String)]
+        val writeStream = inputData
+          .toDS()
+          .toDF("a", "b")
+          .writeStream
+          .option("checkpointLocation", checkpointDir1.getCanonicalPath)
+          .foreachBatch {
+            (batch: Dataset[Row], _: Long) =>
+              batch.write.format("paimon").mode("append").save(location)
+          }
+          .start()
+
+        // streaming read
+        val readStream = spark.readStream
+          .format("paimon")
+          .option("read.changelog", "true")
+          .option("scan.mode", "from-snapshot")
+          .option("scan.snapshot-id", 1)
+          .load(location)
+          .writeStream
+          .format("memory")
+          .option("checkpointLocation", checkpointDir2.getCanonicalPath)
+          .queryName("mem_table")
+          .outputMode("append")
+          .start()
+
+        val currentResult = () => spark.sql("SELECT * FROM mem_table")
+        try {
+          inputData.addData((1, "v_1"))
+          writeStream.processAllAvailable()
+          readStream.processAllAvailable()
+          val expertResult1 = Row("+I", 1, "v_1") :: Nil
+          checkAnswer(currentResult(), expertResult1)
+
+          inputData.addData((2, "v_2"))
+          writeStream.processAllAvailable()
+          readStream.processAllAvailable()
+          val expertResult2 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: Nil
+          checkAnswer(currentResult(), expertResult2)
+
+          inputData.addData((2, "v_2_new"))
+          writeStream.processAllAvailable()
+          readStream.processAllAvailable()
+          val expertResult3 = Row("+I", 1, "v_1") :: Row("+I", 2, "v_2") :: 
Row(
+            "-U",
+            2,
+            "v_2") :: Row("+U", 2, "v_2_new") :: Nil
+          checkAnswer(currentResult(), expertResult3)
+
+          inputData.addData((1, "v_1_new"), (3, "v_3"))
+          writeStream.processAllAvailable()
+          readStream.processAllAvailable()
+          val expertResult4 =
+            Row("+I", 1, "v_1") :: Row("-U", 1, "v_1") :: Row("+U", 1, 
"v_1_new") :: Row(
+              "+I",
+              2,
+              "v_2") :: Row("-U", 2, "v_2") :: Row("+U", 2, "v_2_new") :: 
Row("+I", 3, "v_3") :: Nil
+          checkAnswer(currentResult(), expertResult4)
+        } finally {
+          readStream.stop()
+        }
+    }
+  }
+
+}

Reply via email to