This is an automated email from the ASF dual-hosted git repository.
wangzhen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new fb2b7ef9c6 [KYUUBI #7126][LINEAGE] Support merge into syntax in row
level catalog
fb2b7ef9c6 is described below
commit fb2b7ef9c6863a442bf7f29028d4497c65438408
Author: chenliang.lu <[email protected]>
AuthorDate: Wed Aug 13 17:35:41 2025 +0800
[KYUUBI #7126][LINEAGE] Support merge into syntax in row level catalog
### Why are the changes needed?
In Catalog which supports row level interface (iceberg etc.), merge into
will be rewritten as WriteDelta or ReplaceData by rule . We should support the
extraction of lineage relationship under this type.
### How was this patch tested?
add new tests for row-level catalog
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #7127 from yabola/master-listener.
Closes #7126
e39d0d93b [chenliang.lu] add for notMatchedInstructions
1530d7c6c [chenliang.lu] add notMatchedInstructions
00660f83b [chenliang.lu] update spark version
db7d9ca85 [chenliang.lu] update spark version && optimize some code
85de5b069 [chenliang.lu] remove useless logger && fix ut
79d2f9bfa [chenliang.lu] fix check style
2f5e62cdd [chenliang.lu] [KYUUBI #7126][LINEAGE] Support merge into syntax
in row level catalog
Authored-by: chenliang.lu <[email protected]>
Signed-off-by: wforget <[email protected]>
---
extensions/spark/kyuubi-spark-lineage/README.md | 4 +-
.../helper/SparkSQLLineageParseHelper.scala | 46 +++++
.../helper/RowLevelCatalogLineageParserSuite.scala | 221 +++++++++++++++++++++
.../helper/SparkSQLLineageParserHelperSuite.scala | 116 +++--------
.../helper/TableCatalogLineageParserSuite.scala | 86 ++++++++
5 files changed, 385 insertions(+), 88 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-lineage/README.md
b/extensions/spark/kyuubi-spark-lineage/README.md
index 3f24cd1730..6b3eeb902b 100644
--- a/extensions/spark/kyuubi-spark-lineage/README.md
+++ b/extensions/spark/kyuubi-spark-lineage/README.md
@@ -26,7 +26,7 @@
## Build
```shell
-build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am
-Dspark.version=3.2.1
+build/mvn clean package -DskipTests -pl :kyuubi-spark-lineage_2.12 -am
-Dspark.version=3.5.1
```
### Supported Apache Spark Versions
@@ -37,6 +37,4 @@ build/mvn clean package -DskipTests -pl
:kyuubi-spark-lineage_2.12 -am -Dspark.v
- [x] 3.5.x (default)
- [x] 3.4.x
- [x] 3.3.x
-- [x] 3.2.x
-- [x] 3.1.x
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
index a6cb6934f6..bcab9b74fe 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala
@@ -46,6 +46,9 @@ trait LineageParser {
val SUBQUERY_COLUMN_IDENTIFIER = "__subquery__"
val AGGREGATE_COUNT_COLUMN_IDENTIFIER = "__count__"
val LOCAL_TABLE_IDENTIFIER = "__local__"
+ val METADATA_COL_ATTR_KEY = "__metadata_col"
+ val ORIGINAL_ROW_ID_VALUE_PREFIX: String = "__original_row_id_"
+ val OPERATION_COLUMN: String = "__row_operation"
type AttributeMap[A] = ListMap[Attribute, A]
@@ -307,7 +310,37 @@ trait LineageParser {
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case
(k, v) =>
k.withName(s"$table.${k.name}") -> v
}
+ case p if p.nodeName == "MergeRows" =>
+ val instructionsOutputs =
+ getField[Seq[Expression]](p, "matchedInstructions")
+ .map(extractInstructionOutputs) ++
+ getField[Seq[Expression]](p, "notMatchedInstructions")
+ .map(extractInstructionOutputs) ++
+ getField[Seq[Expression]](p, "notMatchedBySourceInstructions")
+ .map(extractInstructionOutputs)
+ val nextColumnsLineage = ListMap(p.output.indices.map { index =>
+ val keyAttr = p.output(index)
+ val instructionOutputs = instructionsOutputs.map(_(index))
+ (keyAttr, instructionOutputs)
+ }.collect {
+ case (keyAttr: Attribute, instructionsOutput)
+ if instructionsOutput
+ .exists(_.references.nonEmpty) =>
+ val attributeSet = AttributeSet.apply(instructionsOutput)
+ keyAttr -> attributeSet
+ }: _*)
+ p.children.map(
+ extractColumnsLineage(_,
nextColumnsLineage)).reduce(mergeColumnsLineage)
+ case p if p.nodeName == "WriteDelta" || p.nodeName == "ReplaceData" =>
+ val table = getV2TableName(getField[NamedRelation](plan, "table"))
+ val query = getQuery(plan)
+ val columnsLineage = extractColumnsLineage(query, parentColumnsLineage)
+ columnsLineage
+ .filter { case (k, _) => !isMetadataAttr(k) }
+ .map { case (k, v) =>
+ k.withName(s"$table.${k.name}") -> v
+ }
case p if p.nodeName == "MergeIntoTable" =>
val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions")
val notMatchedActions = getField[Seq[MergeAction]](plan,
"notMatchedActions")
@@ -514,6 +547,19 @@ trait LineageParser {
case _ => qualifiedName
}
}
+
+ private def isMetadataAttr(attr: Attribute): Boolean = {
+ attr.metadata.contains(METADATA_COL_ATTR_KEY) ||
+ attr.name.startsWith(ORIGINAL_ROW_ID_VALUE_PREFIX) ||
+ attr.name.startsWith(OPERATION_COLUMN)
+ }
+
+ private def extractInstructionOutputs(instruction: Expression):
Seq[Expression] = {
+ instruction match {
+ case p if p.nodeName == "Split" => getField[Seq[Expression]](p,
"otherOutput")
+ case p => getField[Seq[Expression]](p, "output")
+ }
+ }
}
case class SparkSQLLineageParseHelper(sparkSession: SparkSession) extends
LineageParser
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
new file mode 100644
index 0000000000..81b6d85e25
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/RowLevelCatalogLineageParserSuite.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.helper
+
+import org.apache.kyuubi.plugin.lineage.Lineage
+import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
+
+class RowLevelCatalogLineageParserSuite extends
SparkSQLLineageParserHelperSuite {
+
+ override def catalogName: String = {
+
"org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTableCatalog"
+ }
+
+ test("columns lineage extract - WriteDelta") {
+ assume(
+ SPARK_RUNTIME_VERSION >= "3.5",
+ "WriteDelta is only supported in SPARK_RUNTIME_VERSION >= 3.5")
+ val ddls =
+ """
+ |create table v2_catalog.db.target_t(pk int not null, name string,
price float)
+ | TBLPROPERTIES ('supports-deltas'='true');
+ |create table v2_catalog.db.source_t(pk int not null, name string,
price float)
+ | TBLPROPERTIES ('supports-deltas'='true');
+ |create table v2_catalog.db.pivot_t(pk int not null, price float)
+ | TBLPROPERTIES ('supports-deltas'='true')
+ |""".stripMargin
+ ddls.split(";").filter(_.nonEmpty).foreach(spark.sql(_).collect())
+
+ withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t",
"v2_catalog.db.pivot_t") { _ =>
+ val ret0 = extractLineageWithoutExecuting(
+ "MERGE INTO v2_catalog.db.target_t AS target " +
+ "USING v2_catalog.db.source_t AS source " +
+ "ON target.pk = source.pk " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET target.name = source.name, target.price = source.price
" +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (pk, name, price) VALUES (cast(source.pk as int),
source.name, source.price)" +
+ "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET target.name = 'abc' ")
+ assert(ret0 == Lineage(
+ List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ (
+ "v2_catalog.db.target_t.pk",
+ Set("v2_catalog.db.source_t.pk", "v2_catalog.db.target_t.pk")),
+ ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+ (
+ "v2_catalog.db.target_t.price",
+ Set("v2_catalog.db.source_t.price",
"v2_catalog.db.target_t.price")))))
+
+ val ret1 = extractLineageWithoutExecuting(
+ "MERGE INTO v2_catalog.db.target_t AS target " +
+ "USING v2_catalog.db.source_t AS source " +
+ "ON target.pk = source.pk " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET * " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *")
+ assert(ret1 == Lineage(
+ List("v2_catalog.db.source_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ ("v2_catalog.db.target_t.pk", Set("v2_catalog.db.source_t.pk")),
+ ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+ ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.source_t.price")))))
+
+ val ret2 = extractLineageWithoutExecuting(
+ "MERGE INTO v2_catalog.db.target_t AS target " +
+ "USING (select a.pk, a.name, b.price " +
+ "from v2_catalog.db.source_t a join " +
+ "v2_catalog.db.pivot_t b) AS source " +
+ "ON target.pk = source.pk " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET * " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *")
+
+ assert(ret2 == Lineage(
+ List("v2_catalog.db.source_t", "v2_catalog.db.pivot_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ ("v2_catalog.db.target_t.pk", Set("v2_catalog.db.source_t.pk")),
+ ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+ ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.pivot_t.price")))))
+
+ val ret3 = extractLineageWithoutExecuting(
+ "update v2_catalog.db.target_t AS set name='abc' where price < 10 ")
+ assert(ret3 == Lineage(
+ List("v2_catalog.db.target_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ ("v2_catalog.db.target_t.pk", Set("v2_catalog.db.target_t.pk")),
+ ("v2_catalog.db.target_t.name", Set()),
+ ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.target_t.price")))))
+ }
+ }
+
+ test("columns lineage extract - ReplaceData") {
+ assume(
+ SPARK_RUNTIME_VERSION >= "3.5",
+ "ReplaceData[SPARK-43963] for merge into is supported in
SPARK_RUNTIME_VERSION >= 3.5")
+ val ddls =
+ """
+ |create table v2_catalog.db.target_t(id int, name string, price float)
+ |create table v2_catalog.db.source_t(id int, name string, price float)
+ |create table v2_catalog.db.pivot_t(id int, price float)
+ |""".stripMargin
+ ddls.split("\n").filter(_.nonEmpty).foreach(spark.sql(_).collect())
+ withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t",
"v2_catalog.db.pivot_t") { _ =>
+ val ret0 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
+ "USING v2_catalog.db.source_t AS source " +
+ "ON target.id = source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET target.name = source.name, target.price = source.price "
+
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (id, name, price) VALUES (cast(source.id as int),
source.name, source.price)")
+
+ /**
+ * The ReplaceData operation requires that target records which are read
but do not match
+ * any of the MATCHED or NOT MATCHED BY SOURCE clauses also be copied.
+ * (refer to [[RewriteMergeIntoTable#buildReplaceDataMergeRowsPlan]])
+ */
+ assert(ret0 == Lineage(
+ List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ (
+ "v2_catalog.db.target_t.id",
+ Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
+ (
+ "v2_catalog.db.target_t.name",
+ Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
+ (
+ "v2_catalog.db.target_t.price",
+ Set("v2_catalog.db.source_t.price",
"v2_catalog.db.target_t.price")))))
+
+ val ret1 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
+ "USING v2_catalog.db.source_t AS source " +
+ "ON target.id = source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET * " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *")
+ assert(ret1 == Lineage(
+ List("v2_catalog.db.source_t", "v2_catalog.db.target_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ (
+ "v2_catalog.db.target_t.id",
+ Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
+ (
+ "v2_catalog.db.target_t.name",
+ Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
+ (
+ "v2_catalog.db.target_t.price",
+ Set("v2_catalog.db.source_t.price",
"v2_catalog.db.target_t.price")))))
+
+ val ret2 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
+ "USING (select a.id, a.name, b.price " +
+ "from v2_catalog.db.source_t a join v2_catalog.db.pivot_t b) AS source
" +
+ "ON target.id = source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET * " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *")
+
+ assert(ret2 == Lineage(
+ List("v2_catalog.db.source_t", "v2_catalog.db.target_t",
"v2_catalog.db.pivot_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ (
+ "v2_catalog.db.target_t.id",
+ Set("v2_catalog.db.source_t.id", "v2_catalog.db.target_t.id")),
+ (
+ "v2_catalog.db.target_t.name",
+ Set("v2_catalog.db.source_t.name", "v2_catalog.db.target_t.name")),
+ (
+ "v2_catalog.db.target_t.price",
+ Set("v2_catalog.db.pivot_t.price",
"v2_catalog.db.target_t.price")))))
+
+ val ret3 = extractLineageWithoutExecuting(
+ "update v2_catalog.db.target_t AS set name='abc' where price < 10 ")
+ // For tables that do not support row-level deletion,
+ // duplicate data of the same group may be included when writing.
+ // plan is:
+ // ReplaceData
+ // +- Project [if ((price#1160 < cast(10 as float))) id#1158 else
id#1158 AS id#1163,
+ // if ((price#1160 < cast(10 as float))) abc else name#1159 AS
name#1164,
+ // if ((price#1160 < cast(10 as float))) price#1160 else price#1160
AS price#1165,
+ // _partition#1162]
+ // +- RelationV2[id#1158, name#1159, price#1160, _partition#1162]
+ // v2_catalog.db.target_t v2_catalog.db.target_t
+ assert(ret3 == Lineage(
+ List("v2_catalog.db.target_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ (
+ "v2_catalog.db.target_t.id",
+ Set("v2_catalog.db.target_t.price", "v2_catalog.db.target_t.id")),
+ (
+ "v2_catalog.db.target_t.name",
+ Set("v2_catalog.db.target_t.price",
"v2_catalog.db.target_t.name")),
+ ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.target_t.price")))))
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
index 88ef055389..e3cda6959f 100644
---
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala
@@ -31,12 +31,13 @@ import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
import
org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
-class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
+abstract class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
with SparkListenerExtensionTest {
- val catalogName =
- if (SPARK_RUNTIME_VERSION <= "3.1")
"org.apache.spark.sql.connector.InMemoryTableCatalog"
+ def catalogName: String = {
+ if (SPARK_RUNTIME_VERSION <= "3.3")
"org.apache.spark.sql.connector.InMemoryTableCatalog"
else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+ }
val DEFAULT_CATALOG = LineageConf.DEFAULT_CATALOG
override protected val catalogImpl: String = "hive"
@@ -169,65 +170,6 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
}
}
- test("columns lineage extract - MergeIntoTable") {
- val ddls =
- """
- |create table v2_catalog.db.target_t(id int, name string, price float)
- |create table v2_catalog.db.source_t(id int, name string, price float)
- |create table v2_catalog.db.pivot_t(id int, price float)
- |""".stripMargin
- ddls.split("\n").filter(_.nonEmpty).foreach(spark.sql(_).collect())
- withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t") { _ =>
- val ret0 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
- "USING v2_catalog.db.source_t AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED THEN " +
- " UPDATE SET target.name = source.name, target.price = source.price "
+
- "WHEN NOT MATCHED THEN " +
- " INSERT (id, name, price) VALUES (cast(source.id as int),
source.name, source.price)")
- assert(ret0 == Lineage(
- List("v2_catalog.db.source_t"),
- List("v2_catalog.db.target_t"),
- List(
- ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
- ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
- ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.source_t.price")))))
-
- val ret1 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
- "USING v2_catalog.db.source_t AS source " +
- "ON target.id = source.id " +
- "WHEN MATCHED THEN " +
- " UPDATE SET * " +
- "WHEN NOT MATCHED THEN " +
- " INSERT *")
- assert(ret1 == Lineage(
- List("v2_catalog.db.source_t"),
- List("v2_catalog.db.target_t"),
- List(
- ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
- ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
- ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.source_t.price")))))
-
- val ret2 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
- "USING (select a.id, a.name, b.price " +
- "from v2_catalog.db.source_t a join v2_catalog.db.pivot_t b) AS source
" +
- "ON target.id = source.id " +
- "WHEN MATCHED THEN " +
- " UPDATE SET * " +
- "WHEN NOT MATCHED THEN " +
- " INSERT *")
-
- assert(ret2 == Lineage(
- List("v2_catalog.db.source_t", "v2_catalog.db.pivot_t"),
- List("v2_catalog.db.target_t"),
- List(
- ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
- ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
- ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.pivot_t.price")))))
- }
-
- }
-
test("columns lineage extract - CreateViewCommand") {
withView("createviewcommand", "createviewcommand1", "createviewcommand2")
{ _ =>
val ret0 = extractLineage(
@@ -1451,32 +1393,36 @@ class SparkSQLLineageParserHelperSuite extends
KyuubiFunSuite
test("test directory to table") {
val inputFile = getClass.getResource("/").getPath + "input_file"
val sourceFile = File(inputFile).createFile()
- spark.sql(
- s"""
- |CREATE OR REPLACE TEMPORARY VIEW temp_view (
- | `a` STRING COMMENT '',
- | `b` STRING COMMENT ''
- |) USING csv OPTIONS(
- | sep='\t',
- | path='${sourceFile.path}'
- |);
- |""".stripMargin).collect()
-
- val ret0 = extractLineageWithoutExecuting(
- s"""
- |INSERT OVERWRITE TABLE test_db.test_table_from_dir
- |SELECT `a`, `b` FROM temp_view
- |""".stripMargin)
+ withView("temp_view") { _ =>
+ {
+ spark.sql(
+ s"""
+ |CREATE OR REPLACE TEMPORARY VIEW temp_view (
+ | `a` STRING COMMENT '',
+ | `b` STRING COMMENT ''
+ |) USING csv OPTIONS(
+ | sep='\t',
+ | path='${sourceFile.path}'
+ |);
+ |""".stripMargin).collect()
+
+ val ret0 = extractLineageWithoutExecuting(
+ s"""
+ |INSERT OVERWRITE TABLE test_db.test_table_from_dir
+ |SELECT `a`, `b` FROM temp_view
+ |""".stripMargin)
- assert(ret0 == Lineage(
- List(),
- List(s"spark_catalog.test_db.test_table_from_dir"),
- List(
- (s"spark_catalog.test_db.test_table_from_dir.a0", Set()),
- (s"spark_catalog.test_db.test_table_from_dir.b0", Set()))))
+ assert(ret0 == Lineage(
+ List(),
+ List(s"spark_catalog.test_db.test_table_from_dir"),
+ List(
+ (s"spark_catalog.test_db.test_table_from_dir.a0", Set()),
+ (s"spark_catalog.test_db.test_table_from_dir.b0", Set()))))
+ }
+ }
}
- private def extractLineageWithoutExecuting(sql: String): Lineage = {
+ protected def extractLineageWithoutExecuting(sql: String): Lineage = {
val parsed = spark.sessionState.sqlParser.parsePlan(sql)
val analyzed = spark.sessionState.analyzer.execute(parsed)
spark.sessionState.analyzer.checkAnalysis(analyzed)
diff --git
a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
new file mode 100644
index 0000000000..ea607452aa
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/TableCatalogLineageParserSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.lineage.helper
+
+import org.apache.kyuubi.plugin.lineage.Lineage
+
+class TableCatalogLineageParserSuite extends SparkSQLLineageParserHelperSuite {
+
+ override def catalogName: String = {
+ "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
+ }
+
+ test("columns lineage extract - MergeIntoTable") {
+ val ddls =
+ """
+ |create table v2_catalog.db.target_t(id int, name string, price float)
+ |create table v2_catalog.db.source_t(id int, name string, price float)
+ |create table v2_catalog.db.pivot_t(id int, price float)
+ |""".stripMargin
+ ddls.split("\n").filter(_.nonEmpty).foreach(spark.sql(_).collect())
+ withTable("v2_catalog.db.target_t", "v2_catalog.db.source_t",
"v2_catalog.db.pivot_t") { _ =>
+ val ret0 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
+ "USING v2_catalog.db.source_t AS source " +
+ "ON target.id = source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET target.name = source.name, target.price = source.price "
+
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (id, name, price) VALUES (cast(source.id as int),
source.name, source.price)")
+ assert(ret0 == Lineage(
+ List("v2_catalog.db.source_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
+ ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+ ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.source_t.price")))))
+
+ val ret1 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
+ "USING v2_catalog.db.source_t AS source " +
+ "ON target.id = source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET * " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *")
+ assert(ret1 == Lineage(
+ List("v2_catalog.db.source_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
+ ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+ ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.source_t.price")))))
+
+ val ret2 = extractLineageWithoutExecuting("MERGE INTO
v2_catalog.db.target_t AS target " +
+ "USING (select a.id, a.name, b.price " +
+ "from v2_catalog.db.source_t a join v2_catalog.db.pivot_t b) AS source
" +
+ "ON target.id = source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET * " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *")
+
+ assert(ret2 == Lineage(
+ List("v2_catalog.db.source_t", "v2_catalog.db.pivot_t"),
+ List("v2_catalog.db.target_t"),
+ List(
+ ("v2_catalog.db.target_t.id", Set("v2_catalog.db.source_t.id")),
+ ("v2_catalog.db.target_t.name", Set("v2_catalog.db.source_t.name")),
+ ("v2_catalog.db.target_t.price",
Set("v2_catalog.db.pivot_t.price")))))
+ }
+
+ }
+}