This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 108a885b4db [HUDI-7294] TVF to query hudi metadata (#10491)
108a885b4db is described below

commit 108a885b4db62f08d30ede47805b8b44c35ab1e6
Author: bhat-vinay <152183592+bhat-vi...@users.noreply.github.com>
AuthorDate: Wed Jan 17 08:21:07 2024 +0530

    [HUDI-7294] TVF to query hudi metadata (#10491)
    
    Adds a TVF function to query hudi metadata through spark-sql. Since the 
metadata is already a MOR table, it simply creates a 'snapshot' on
    a MOR relation. Could not find any way to format (or filter) the RDD 
generated by the MOR snapshot relation. Uploading the PR to get some feedback.
    
    Co-authored-by: Vinaykumar Bhat <vi...@onehouse.ai>
---
 .../sql/hudi/TestHoodieTableValuedFunction.scala   | 68 ++++++++++++++++++++++
 .../logcal/HoodieMetadataTableValuedFunction.scala | 46 +++++++++++++++
 .../hudi/analysis/HoodieSpark32PlusAnalysis.scala  | 17 +++++-
 .../sql/hudi/analysis/TableValuedFunctions.scala   |  7 ++-
 4 files changed, 136 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
index 867e83c301e..bdf512d3451 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
@@ -21,6 +21,8 @@ import 
org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.spark.sql.functions.{col, from_json}
 
+import scala.collection.Seq
+
 class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase {
 
   test(s"Test hudi_query Table-Valued Function") {
@@ -558,4 +560,70 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test(s"Test hudi_metadata Table-Valued Function") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+          val tableName = generateTableName
+          val identifier = tableName
+          spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  ts long,
+               |  price int
+               |) using hudi
+               |partitioned by (price)
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.datasource.write.recordkey.field = 'id',
+               |  hoodie.metadata.record.index.enable = 'true',
+               |  hoodie.metadata.index.column.stats.enable = 'true',
+               |  hoodie.metadata.index.column.stats.column.list = 'price'
+               |)
+               |location '${tmp.getCanonicalPath}/$tableName'
+               |""".stripMargin
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3', 
3000, 30)
+               | """.stripMargin
+          )
+
+          val result2DF = spark.sql(
+            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where type=1"
+          )
+          assert(result2DF.count() == 1)
+
+          val result3DF = spark.sql(
+            s"select type, key, filesystemmetadata from 
hudi_metadata('$identifier') where type=2"
+          )
+          assert(result3DF.count() == 3)
+
+          val result4DF = spark.sql(
+            s"select type, key, ColumnStatsMetadata from 
hudi_metadata('$identifier') where type=3"
+          )
+          assert(result4DF.count() == 3)
+
+          val result5DF = spark.sql(
+            s"select type, key, recordIndexMetadata from 
hudi_metadata('$identifier') where type=5"
+          )
+          assert(result5DF.count() == 3)
+
+          val result6DF = spark.sql(
+            s"select type, key, BloomFilterMetadata from 
hudi_metadata('$identifier') where BloomFilterMetadata is not null"
+          )
+          assert(result6DF.count() == 0)
+        }
+      }
+    }
+    spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala
new file mode 100644
index 00000000000..c4eca4bd0a9
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logcal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+object HoodieMetadataTableValuedFunction {
+
+  val FUNC_NAME = "hudi_metadata";
+
+  def parseOptions(exprs: Seq[Expression], funcName: String): (String, 
Map[String, String]) = {
+    val args = exprs.map(_.eval().toString)
+    if (args.size != 1) {
+      throw new AnalysisException(s"Expect arguments (table_name or 
table_path) for function `$funcName`")
+    }
+
+    val identifier = args.head
+
+    (identifier, Map("hoodie.datasource.query.type" -> "snapshot"))
+  }
+}
+
+case class HoodieMetadataTableValuedFunction(args: Seq[Expression]) extends 
LeafNode {
+
+  override def output: Seq[Attribute] = Nil
+
+  override lazy val resolved: Boolean = false
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index bc8edc72295..9c2f5bfb58c 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionBy
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, 
UnresolvedPartitionSpec}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
-import 
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
 HoodieFileSystemViewTableValuedFunctionOptionsParser, HoodieQuery, 
HoodieTableChanges, HoodieTableChangesOptionsParser, 
HoodieTimelineTableValuedFunction, 
HoodieTimelineTableValuedFunctionOptionsParser}
+import 
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
 HoodieFileSystemViewTableValuedFunctionOptionsParser, 
HoodieMetadataTableValuedFunction, HoodieQuery, HoodieTableChanges, 
HoodieTableChangesOptionsParser, HoodieTimelineTableValuedFunction, 
HoodieTimelineTableValuedFunctionOptionsParser}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.Origin
@@ -134,6 +134,21 @@ case class HoodieSpark32PlusResolveReferences(spark: 
SparkSession) extends Rule[
           catalogTable.location.toString))
         LogicalRelation(relation, catalogTable)
       }
+    case HoodieMetadataTableValuedFunction(args) =>
+      val (tablePath, opts) = 
HoodieMetadataTableValuedFunction.parseOptions(args, 
HoodieMetadataTableValuedFunction.FUNC_NAME)
+      val hoodieDataSource = new DefaultSource
+      if (tablePath.contains(Path.SEPARATOR)) {
+        // the first param is table path
+        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts 
++ Map("path" -> (tablePath + "/.hoodie/metadata")))
+        LogicalRelation(relation)
+      } else {
+        // the first param is table identifier
+        val tableId = 
spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
+        val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
+        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts 
++ Map("path" ->
+          (catalogTable.location.toString + "/.hoodie/metadata")))
+        LogicalRelation(relation, catalogTable)
+      }
     case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
       // START: custom Hudi change: don't want to go to the spark mit 
resolution so we resolve the source and target
       // if they haven't been
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
index b5f71389fab..e87a6de8db9 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis
 
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import 
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
 HoodieQuery, HoodieTableChanges, HoodieTimelineTableValuedFunction}
+import 
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
 HoodieMetadataTableValuedFunction, HoodieQuery, HoodieTableChanges, 
HoodieTimelineTableValuedFunction}
 
 object TableValuedFunctions {
 
@@ -43,6 +43,11 @@ object TableValuedFunctions {
       FunctionIdentifier(HoodieFileSystemViewTableValuedFunction.FUNC_NAME),
       new 
ExpressionInfo(HoodieFileSystemViewTableValuedFunction.getClass.getCanonicalName,
 HoodieFileSystemViewTableValuedFunction.FUNC_NAME),
       (args: Seq[Expression]) => new 
HoodieFileSystemViewTableValuedFunction(args)
+    ),
+    (
+      FunctionIdentifier(HoodieMetadataTableValuedFunction.FUNC_NAME),
+      new 
ExpressionInfo(HoodieMetadataTableValuedFunction.getClass.getCanonicalName, 
HoodieMetadataTableValuedFunction.FUNC_NAME),
+      (args: Seq[Expression]) => new HoodieMetadataTableValuedFunction(args)
     )
   )
 }

Reply via email to