This is an automated email from the ASF dual-hosted git repository.

wangzhen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2b7ef9c6 [KYUUBI #7126][LINEAGE] Support merge into syntax in row 
level catalog
fb2b7ef9c6 is described below

commit fb2b7ef9c6863a442bf7f29028d4497c65438408
Author: chenliang.lu <[email protected]>
AuthorDate: Wed Aug 13 17:35:41 2025 +0800

    [KYUUBI #7126][LINEAGE] Support merge into syntax in row level catalog
    
    ### Why are the changes needed?
    
    In Catalog which supports row level interface (iceberg etc.), merge into 
will be rewritten as WriteDelta or ReplaceData by rule . We should support the 
extraction of lineage relationship under this type.
    
    ### How was this patch tested?
    
    add new tests for row-level catalog
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #7127 from yabola/master-listener.
    
    Closes #7126
    
    e39d0d93b [chenliang.lu] add for notMatchedInstructions
    1530d7c6c [chenliang.lu] add notMatchedInstructions
    00660f83b [chenliang.lu] update spark version
    db7d9ca85 [chenliang.lu] update spark version && optimize some code
    85de5b069 [chenliang.lu] remove useless logger && fix ut
    79d2f9bfa [chenliang.lu] fix check style
    2f5e62cdd [chenliang.lu] [KYUUBI #7126][LINEAGE] Support merge into syntax 
in row level catalog
    
    Authored-by: chenliang.lu <[email protected]>
    Signed-off-by: wforget <[email protected]>
---
 extensions/spark/kyuubi-spark-lineage/README.md    |   4 +-
 .../helper/SparkSQLLineageParseHelper.scala        |  46 +++++
 .../helper/RowLevelCatalogLineageParserSuite.scala | 221 +++++++++++++++++++++
 .../helper/SparkSQLLineageParserHelperSuite.scala  | 116 +++--------
 .../helper/TableCatalogLineageParserSuite.scala    |  86 ++++++++
 5 files changed, 385 insertions(+), 88 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-lineage/README.md 
b/extensions/spark/kyuubi-spark-lineage/README.md
index 3f24cd1730..6b3eeb902b 100644
--- a/extensions/spark/kyuubi-spark-lineage/README.md
+++ b/extensions/spark/kyuubi-spark-lineage/README.md
@@ -26,7 +26,7 @@
 ## Build
 
 ```shell
-build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am 
-Dspark.version=3.2.1
+build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am 
-Dspark.version=3.5.1
 ```
 
 ### Supported Apache Spark Versions
@@ -37,6 +37,4 @@ build/mvn clean package -DskipTests -pl 
:kyuubi-spark-lineage_2.12 -am -Dspark.v
 - [x] 3.5.x (default)
 - [x] 3.4.x
 - [x] 3.3.x
-- [x] 3.2.x
-- [x] 3.1.x
 
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index a6cb6934f6..bcab9b74fe 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -46,6 +46,9 @@ trait LineageParser {
   val SUBQUERY_COLUMN_IDENTIFIER = "__subquery__"
   val AGGREGATE_COUNT_COLUMN_IDENTIFIER = "__count__"
   val LOCAL_TABLE_IDENTIFIER = "__local__"
+  val METADATA_COL_ATTR_KEY = "__metadata_col"
+  val ORIGINAL_ROW_ID_VALUE_PREFIX: String = "__original_row_id_"
+  val OPERATION_COLUMN: String = "__row_operation"
 
   type AttributeMap[A] = ListMap[Attribute, A]
 
@@ -307,7 +310,37 @@ trait LineageParser {
         extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case 
(k, v) =>
           k.withName(s"$table.${k.name}") -> v
         }
+      case p if p.nodeName == "MergeRows" =>
+        val instructionsOutputs =
+          getField[Seq[Expression]](p, "matchedInstructions")
+            .map(extractInstructionOutputs) ++
+            getField[Seq[Expression]](p, "notMatchedInstructions")
+              .map(extractInstructionOutputs) ++
+            getField[Seq[Expression]](p, "notMatchedBySourceInstructions")
+              .map(extractInstructionOutputs)
+        val nextColumnsLineage = ListMap(p.output.indices.map { index =>
+          val keyAttr = p.output(index)
+          val instructionOutputs = instructionsOutputs.map(_(index))
+          (keyAttr, instructionOutputs)
+        }.collect {
+          case (keyAttr: Attribute, instructionsOutput)
+              if instructionsOutput
+                .exists(_.references.nonEmpty) =>
+            val attributeSet = AttributeSet.apply(instructionsOutput)
+            keyAttr -> attributeSet
+        }: _*)
+        p.children.map(
+          extractColumnsLineage(_, 
nextColumnsLineage)).reduce(mergeColumnsLineage)
 
+      case p if p.nodeName == "WriteDelta" || p.nodeName == "ReplaceData" =>
+        val table = getV2TableName(getField[NamedRelation](plan, "table"))
+        val query = getQuery(plan)
+        val columnsLineage = extractColumnsLineage(query, parentColumnsLineage)
+        columnsLineage
+          .filter { case (k, _) => !isMetadataAttr(k) }
+          .map { case (k, v) =>
+            k.withName(s"$table.${k.name}") -> v
+          }
       case p if p.nodeName == "MergeIntoTable" =>
         val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions")
         val notMatchedActions = getField[Seq[MergeAction]](plan, 
"notMatchedActions")
@@ -514,6 +547,19 @@ trait LineageParser {
       case _ => qualifiedName
     }
   }
+
+  private def isMetadataAttr(attr: Attribute): Boolean = {
+    attr.metadata.contains(METADATA_COL_ATTR_KEY) ||
+    attr.name.startsWith(ORIGINAL_ROW_ID_VALUE_PREFIX) ||
+    attr.name.startsWith(OPERATION_COLUMN)
+  }
+
+  private def extractInstructionOutputs(instruction: Expression): 
Seq[Expression] = {
+    instruction match {
+      case p if p.nodeName == "Split" => getField[Seq[Expression]](p, 
"otherOutput")
+      case p => getField[Seq[Expression]](p, "output")
+    }
+  }
 }
 
 case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends 
LineageParser
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
new file mode 100644
index 0000000000..81b6d85e25
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.helper
+
+import org.apache.kyuubi.plugin.lineage.Lineage
+import 
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
+
+class RowLevelCatalogLineageParserSuite extends 
SparkSQLLineageParserHelperSuite {
+
+  override def catalogName: String = {
+    
"org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTableCatalog"
+  }
+
+  test("columns lineage extract - WriteDelta") {
+    assume(
+      SPARK_RUNTIME_VERSION >= "3.5",
+      "WriteDelta is only supported in SPARK_RUNTIME_VERSION >= 3.5")
+    val ddls =
+      """
+        |create table v2_catalog.db.target_t(pk int not null, name string, 
price float)
+        | TBLPROPERTIES ('supports-deltas'='true');
+        |create table v2_catalog.db.source_t(pk int not null, name string, 
price float)
+        | TBLPROPERTIES ('supports-deltas'='true');
+        |create table v2_catalog.db.pivot_t(pk int not null, price float)
+        | TBLPROPERTIES ('supports-deltas'='true')
+        |""".stripMargin
+    ddls.split(";").filter(_.nonEmpty).foreach(spark.sql(_).collect())
+
+    withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t", 
"v2_catalog.db.pivot_t") { _ =>
+      val ret0 = extractLineageWithoutExecuting(
+        "MERGE INTO v2_catalog.db.target_t AS target " +
+          "USING v2_catalog.db.source_t AS source " +
+          "ON target.pk = source.pk " +
+          "WHEN MATCHED THEN " +
+          "  UPDATE SET target.name = source.name, target.price = source.price 
" +
+          "WHEN NOT MATCHED THEN " +
+          "  INSERT (pk, name, price) VALUES (cast(source.pk as int), 
source.name, source.price)" +
+          "WHEN NOT MATCHED BY SOURCE THEN  UPDATE SET target.name = 'abc' ")
+      assert(ret0 == Lineage(
+        List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          (
+            "v2_catalog.db.target_t.pk",
+            Set("v2_catalog.db.source_t.pk", "v2_catalog.db.target_t.pk")),
+          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+          (
+            "v2_catalog.db.target_t.price",
+            Set("v2_catalog.db.source_t.price", 
"v2_catalog.db.target_t.price")))))
+
+      val ret1 = extractLineageWithoutExecuting(
+        "MERGE INTO v2_catalog.db.target_t AS target " +
+          "USING v2_catalog.db.source_t AS source " +
+          "ON target.pk = source.pk " +
+          "WHEN MATCHED THEN " +
+          "  UPDATE SET * " +
+          "WHEN NOT MATCHED THEN " +
+          "  INSERT *")
+      assert(ret1 == Lineage(
+        List("v2_catalog.db.source_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          ("v2_catalog.db.target_t.pk", Set("v2_catalog.db.source_t.pk")),
+          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.source_t.price")))))
+
+      val ret2 = extractLineageWithoutExecuting(
+        "MERGE INTO v2_catalog.db.target_t AS target " +
+          "USING (select a.pk, a.name, b.price " +
+          "from v2_catalog.db.source_t a join " +
+          "v2_catalog.db.pivot_t b) AS source " +
+          "ON target.pk = source.pk " +
+          "WHEN MATCHED THEN " +
+          "  UPDATE SET * " +
+          "WHEN NOT MATCHED THEN " +
+          "  INSERT *")
+
+      assert(ret2 == Lineage(
+        List("v2_catalog.db.source_t", "v2_catalog.db.pivot_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          ("v2_catalog.db.target_t.pk", Set("v2_catalog.db.source_t.pk")),
+          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.pivot_t.price")))))
+
+      val ret3 = extractLineageWithoutExecuting(
+        "update v2_catalog.db.target_t AS set name='abc' where price < 10 ")
+      assert(ret3 == Lineage(
+        List("v2_catalog.db.target_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          ("v2_catalog.db.target_t.pk", Set("v2_catalog.db.target_t.pk")),
+          ("v2_catalog.db.target_t.name", Set()),
+          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.target_t.price")))))
+    }
+  }
+
+  test("columns lineage extract - ReplaceData") {
+    assume(
+      SPARK_RUNTIME_VERSION >= "3.5",
+      "ReplaceData[SPARK-43963] for merge into is supported in 
SPARK_RUNTIME_VERSION >= 3.5")
+    val ddls =
+      """
+        |create table v2_catalog.db.target_t(id int, name string, price float)
+        |create table v2_catalog.db.source_t(id int, name string, price float)
+        |create table v2_catalog.db.pivot_t(id int, price float)
+        |""".stripMargin
+    ddls.split("\n").filter(_.nonEmpty).foreach(spark.sql(_).collect())
+    withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t", 
"v2_catalog.db.pivot_t") { _ =>
+      val ret0 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
+        "USING v2_catalog.db.source_t AS source " +
+        "ON target.id = source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET target.name = source.name, target.price = source.price " 
+
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (id, name, price) VALUES (cast(source.id as int), 
source.name, source.price)")
+
+      /**
+       * The ReplaceData operation requires that target records which are read 
but do not match
+       * any of the MATCHED or NOT MATCHED BY SOURCE clauses also be copied.
+       * (refer to [[RewriteMergeIntoTable#buildReplaceDataMergeRowsPlan]])
+       */
+      assert(ret0 == Lineage(
+        List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          (
+            "v2_catalog.db.target_t.id",
+            Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
+          (
+            "v2_catalog.db.target_t.name",
+            Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
+          (
+            "v2_catalog.db.target_t.price",
+            Set("v2_catalog.db.source_t.price", 
"v2_catalog.db.target_t.price")))))
+
+      val ret1 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
+        "USING v2_catalog.db.source_t AS source " +
+        "ON target.id = source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *")
+      assert(ret1 == Lineage(
+        List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          (
+            "v2_catalog.db.target_t.id",
+            Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
+          (
+            "v2_catalog.db.target_t.name",
+            Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
+          (
+            "v2_catalog.db.target_t.price",
+            Set("v2_catalog.db.source_t.price", 
"v2_catalog.db.target_t.price")))))
+
+      val ret2 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
+        "USING (select a.id, a.name, b.price " +
+        "from v2_catalog.db.source_t a join v2_catalog.db.pivot_t b) AS source 
" +
+        "ON target.id = source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *")
+
+      assert(ret2 == Lineage(
+        List("v2_catalog.db.source_t", "v2_catalog.db.target_t", 
"v2_catalog.db.pivot_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          (
+            "v2_catalog.db.target_t.id",
+            Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
+          (
+            "v2_catalog.db.target_t.name",
+            Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
+          (
+            "v2_catalog.db.target_t.price",
+            Set("v2_catalog.db.pivot_t.price", 
"v2_catalog.db.target_t.price")))))
+
+      val ret3 = extractLineageWithoutExecuting(
+        "update v2_catalog.db.target_t AS set name='abc' where price < 10 ")
+      // For tables that do not support row-level deletion,
+      // duplicate data of the same group may be included when writing.
+      // plan is:
+      // ReplaceData
+      // +- Project [if ((price#1160 < cast(10 as float))) id#1158 else 
id#1158 AS id#1163,
+      //    if ((price#1160 < cast(10 as float))) abc else name#1159 AS 
name#1164,
+      //    if ((price#1160 < cast(10 as float))) price#1160 else price#1160 
AS price#1165,
+      //    _partition#1162]
+      // +- RelationV2[id#1158, name#1159, price#1160, _partition#1162]
+      //    v2_catalog.db.target_t v2_catalog.db.target_t
+      assert(ret3 == Lineage(
+        List("v2_catalog.db.target_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          (
+            "v2_catalog.db.target_t.id",
+            Set("v2_catalog.db.target_t.price", "v2_catalog.db.target_t.id")),
+          (
+            "v2_catalog.db.target_t.name",
+            Set("v2_catalog.db.target_t.price", 
"v2_catalog.db.target_t.name")),
+          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.target_t.price")))))
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 88ef055389..e3cda6959f 100644
--- 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -31,12 +31,13 @@ import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.plugin.lineage.Lineage
 import 
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
 
-class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
+abstract class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
   with SparkListenerExtensionTest {
 
-  val catalogName =
-    if (SPARK_RUNTIME_VERSION <= "3.1") 
"org.apache.spark.sql.connector.InMemoryTableCatalog"
+  def catalogName: String = {
+    if (SPARK_RUNTIME_VERSION <= "3.3") 
"org.apache.spark.sql.connector.InMemoryTableCatalog"
     else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+  }
 
   val DEFAULT_CATALOG = LineageConf.DEFAULT_CATALOG
   override protected val catalogImpl: String = "hive"
@@ -169,65 +170,6 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
     }
   }
 
-  test("columns lineage extract - MergeIntoTable") {
-    val ddls =
-      """
-        |create table v2_catalog.db.target_t(id int, name string, price float)
-        |create table v2_catalog.db.source_t(id int, name string, price float)
-        |create table v2_catalog.db.pivot_t(id int, price float)
-        |""".stripMargin
-    ddls.split("\n").filter(_.nonEmpty).foreach(spark.sql(_).collect())
-    withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t") { _ =>
-      val ret0 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
-        "USING v2_catalog.db.source_t AS source " +
-        "ON target.id = source.id " +
-        "WHEN MATCHED THEN " +
-        "  UPDATE SET target.name = source.name, target.price = source.price " 
+
-        "WHEN NOT MATCHED THEN " +
-        "  INSERT (id, name, price) VALUES (cast(source.id as int), 
source.name, source.price)")
-      assert(ret0 == Lineage(
-        List("v2_catalog.db.source_t"),
-        List("v2_catalog.db.target_t"),
-        List(
-          ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
-          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
-          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.source_t.price")))))
-
-      val ret1 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
-        "USING v2_catalog.db.source_t AS source " +
-        "ON target.id = source.id " +
-        "WHEN MATCHED THEN " +
-        "  UPDATE SET * " +
-        "WHEN NOT MATCHED THEN " +
-        "  INSERT *")
-      assert(ret1 == Lineage(
-        List("v2_catalog.db.source_t"),
-        List("v2_catalog.db.target_t"),
-        List(
-          ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
-          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
-          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.source_t.price")))))
-
-      val ret2 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
-        "USING (select a.id, a.name, b.price " +
-        "from v2_catalog.db.source_t a join v2_catalog.db.pivot_t b) AS source 
" +
-        "ON target.id = source.id " +
-        "WHEN MATCHED THEN " +
-        "  UPDATE SET * " +
-        "WHEN NOT MATCHED THEN " +
-        "  INSERT *")
-
-      assert(ret2 == Lineage(
-        List("v2_catalog.db.source_t", "v2_catalog.db.pivot_t"),
-        List("v2_catalog.db.target_t"),
-        List(
-          ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
-          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
-          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.pivot_t.price")))))
-    }
-
-  }
-
   test("columns lineage extract - CreateViewCommand") {
     withView("createviewcommand", "createviewcommand1", "createviewcommand2") 
{ _ =>
       val ret0 = extractLineage(
@@ -1451,32 +1393,36 @@ class SparkSQLLineageParserHelperSuite extends 
KyuubiFunSuite
   test("test directory to table") {
     val inputFile = getClass.getResource("/").getPath + "input_file"
     val sourceFile = File(inputFile).createFile()
-    spark.sql(
-      s"""
-         |CREATE OR REPLACE TEMPORARY VIEW temp_view (
-         | `a` STRING COMMENT '',
-         | `b` STRING COMMENT ''
-         |) USING csv OPTIONS(
-         |  sep='\t',
-         |  path='${sourceFile.path}'
-         |);
-         |""".stripMargin).collect()
-
-    val ret0 = extractLineageWithoutExecuting(
-      s"""
-         |INSERT OVERWRITE TABLE test_db.test_table_from_dir
-         |SELECT `a`, `b` FROM temp_view
-         |""".stripMargin)
+    withView("temp_view") { _ =>
+      {
+        spark.sql(
+          s"""
+             |CREATE OR REPLACE TEMPORARY VIEW temp_view (
+             | `a` STRING COMMENT '',
+             | `b` STRING COMMENT ''
+             |) USING csv OPTIONS(
+             |  sep='\t',
+             |  path='${sourceFile.path}'
+             |);
+             |""".stripMargin).collect()
+
+        val ret0 = extractLineageWithoutExecuting(
+          s"""
+             |INSERT OVERWRITE TABLE test_db.test_table_from_dir
+             |SELECT `a`, `b` FROM temp_view
+             |""".stripMargin)
 
-    assert(ret0 == Lineage(
-      List(),
-      List(s"spark_catalog.test_db.test_table_from_dir"),
-      List(
-        (s"spark_catalog.test_db.test_table_from_dir.a0", Set()),
-        (s"spark_catalog.test_db.test_table_from_dir.b0", Set()))))
+        assert(ret0 == Lineage(
+          List(),
+          List(s"spark_catalog.test_db.test_table_from_dir"),
+          List(
+            (s"spark_catalog.test_db.test_table_from_dir.a0", Set()),
+            (s"spark_catalog.test_db.test_table_from_dir.b0", Set()))))
+      }
+    }
   }
 
-  private def extractLineageWithoutExecuting(sql: String): Lineage = {
+  protected def extractLineageWithoutExecuting(sql: String): Lineage = {
     val parsed = spark.sessionState.sqlParser.parsePlan(sql)
     val analyzed = spark.sessionState.analyzer.execute(parsed)
     spark.sessionState.analyzer.checkAnalysis(analyzed)
diff --git 
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
new file mode 100644
index 0000000000..ea607452aa
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.helper
+
+import org.apache.kyuubi.plugin.lineage.Lineage
+
+class TableCatalogLineageParserSuite extends SparkSQLLineageParserHelperSuite {
+
+  override def catalogName: String = {
+    "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+  }
+
+  test("columns lineage extract - MergeIntoTable") {
+    val ddls =
+      """
+        |create table v2_catalog.db.target_t(id int, name string, price float)
+        |create table v2_catalog.db.source_t(id int, name string, price float)
+        |create table v2_catalog.db.pivot_t(id int, price float)
+        |""".stripMargin
+    ddls.split("\n").filter(_.nonEmpty).foreach(spark.sql(_).collect())
+    withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t", 
"v2_catalog.db.pivot_t") { _ =>
+      val ret0 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
+        "USING v2_catalog.db.source_t AS source " +
+        "ON target.id = source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET target.name = source.name, target.price = source.price " 
+
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT (id, name, price) VALUES (cast(source.id as int), 
source.name, source.price)")
+      assert(ret0 == Lineage(
+        List("v2_catalog.db.source_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
+          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.source_t.price")))))
+
+      val ret1 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
+        "USING v2_catalog.db.source_t AS source " +
+        "ON target.id = source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *")
+      assert(ret1 == Lineage(
+        List("v2_catalog.db.source_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
+          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.source_t.price")))))
+
+      val ret2 = extractLineageWithoutExecuting("MERGE INTO 
v2_catalog.db.target_t AS target " +
+        "USING (select a.id, a.name, b.price " +
+        "from v2_catalog.db.source_t a join v2_catalog.db.pivot_t b) AS source 
" +
+        "ON target.id = source.id " +
+        "WHEN MATCHED THEN " +
+        "  UPDATE SET * " +
+        "WHEN NOT MATCHED THEN " +
+        "  INSERT *")
+
+      assert(ret2 == Lineage(
+        List("v2_catalog.db.source_t", "v2_catalog.db.pivot_t"),
+        List("v2_catalog.db.target_t"),
+        List(
+          ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
+          ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+          ("v2_catalog.db.target_t.price", 
Set("v2_catalog.db.pivot_t.price")))))
+    }
+
+  }
+}

Reply via email to