This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 108a885b4db [HUDI-7294] TVF to query hudi metadata (#10491) 108a885b4db is described below commit 108a885b4db62f08d30ede47805b8b44c35ab1e6 Author: bhat-vinay <152183592+bhat-vi...@users.noreply.github.com> AuthorDate: Wed Jan 17 08:21:07 2024 +0530 [HUDI-7294] TVF to query hudi metadata (#10491) Adds a TVF function to query hudi metadata through spark-sql. Since the metadata is already a MOR table, it simply creates a 'snapshot' on a MOR relation. Could not find any way to format (or filter) the RDD generated by the MOR snapshot relation. Uploading the PR to get some feedback. Co-authored-by: Vinaykumar Bhat <vi...@onehouse.ai> --- .../sql/hudi/TestHoodieTableValuedFunction.scala | 68 ++++++++++++++++++++++ .../logcal/HoodieMetadataTableValuedFunction.scala | 46 +++++++++++++++ .../hudi/analysis/HoodieSpark32PlusAnalysis.scala | 17 +++++- .../sql/hudi/analysis/TableValuedFunctions.scala | 7 ++- 4 files changed, 136 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala index 867e83c301e..bdf512d3451 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala @@ -21,6 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION import org.apache.hudi.HoodieSparkUtils import org.apache.spark.sql.functions.{col, from_json} +import scala.collection.Seq + class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase { test(s"Test hudi_query Table-Valued Function") { @@ -558,4 +560,70 @@ class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase { } } } + + test(s"Test hudi_metadata Table-Valued Function") { + if (HoodieSparkUtils.gteqSpark3_2) { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val identifier = tableName + spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts long, + | price int + |) using hudi + |partitioned by (price) + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.recordkey.field = 'id', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.metadata.index.column.stats.enable = 'true', + | hoodie.metadata.index.column.stats.column.list = 'price' + |) + |location '${tmp.getCanonicalPath}/$tableName' + |""".stripMargin + ) + + spark.sql( + s""" + | insert into $tableName + | values (1, 'a1', 1000, 10), (2, 'a2', 2000, 20), (3, 'a3', 3000, 30) + | """.stripMargin + ) + + val result2DF = spark.sql( + s"select type, key, filesystemmetadata from hudi_metadata('$identifier') where type=1" + ) + assert(result2DF.count() == 1) + + val result3DF = spark.sql( + s"select type, key, filesystemmetadata from hudi_metadata('$identifier') where type=2" + ) + assert(result3DF.count() == 3) + + val result4DF = spark.sql( + s"select type, key, ColumnStatsMetadata from hudi_metadata('$identifier') where type=3" + ) + assert(result4DF.count() == 3) + + val result5DF = spark.sql( + s"select type, key, recordIndexMetadata from hudi_metadata('$identifier') where type=5" + ) + assert(result5DF.count() == 3) + + val result6DF = spark.sql( + s"select type, key, BloomFilterMetadata from hudi_metadata('$identifier') where BloomFilterMetadata is not null" + ) + assert(result6DF.count() == 0) + } + } + } + spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) + } } diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala new file mode 100644 index 00000000000..c4eca4bd0a9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieMetadataTableValuedFunction.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logcal + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LeafNode + +object HoodieMetadataTableValuedFunction { + + val FUNC_NAME = "hudi_metadata"; + + def parseOptions(exprs: Seq[Expression], funcName: String): (String, Map[String, String]) = { + val args = exprs.map(_.eval().toString) + if (args.size != 1) { + throw new AnalysisException(s"Expect arguments (table_name or table_path) for function `$funcName`") + } + + val identifier = args.head + + (identifier, Map("hoodie.datasource.query.type" -> "snapshot")) + } +} + +case class HoodieMetadataTableValuedFunction(args: Seq[Expression]) extends LeafNode { + + override def output: Seq[Attribute] = Nil + + override lazy val resolved: Boolean = false +} + diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala index bc8edc72295..9c2f5bfb58c 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionBy import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, UnresolvedPartitionSpec} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction, HoodieFileSystemViewTableValuedFunctionOptionsParser, HoodieQuery, HoodieTableChanges, HoodieTableChangesOptionsParser, HoodieTimelineTableValuedFunction, HoodieTimelineTableValuedFunctionOptionsParser} +import org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction, HoodieFileSystemViewTableValuedFunctionOptionsParser, HoodieMetadataTableValuedFunction, HoodieQuery, HoodieTableChanges, HoodieTableChangesOptionsParser, HoodieTimelineTableValuedFunction, HoodieTimelineTableValuedFunctionOptionsParser} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.Origin @@ -134,6 +134,21 @@ case class HoodieSpark32PlusResolveReferences(spark: SparkSession) extends Rule[ catalogTable.location.toString)) LogicalRelation(relation, catalogTable) } + case HoodieMetadataTableValuedFunction(args) => + val (tablePath, opts) = HoodieMetadataTableValuedFunction.parseOptions(args, HoodieMetadataTableValuedFunction.FUNC_NAME) + val hoodieDataSource = new DefaultSource + if (tablePath.contains(Path.SEPARATOR)) { + // the first param is table path + val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> (tablePath + "/.hoodie/metadata"))) + LogicalRelation(relation) + } else { + // the first param is table identifier + val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath) + val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId) + val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> + (catalogTable.location.toString + "/.hoodie/metadata"))) + LogicalRelation(relation, catalogTable) + } case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _) // START: custom Hudi change: don't want to go to the spark mit resolution so we resolve the source and target // if they haven't been diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala index b5f71389fab..e87a6de8db9 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} -import org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction, HoodieQuery, HoodieTableChanges, HoodieTimelineTableValuedFunction} +import org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction, HoodieMetadataTableValuedFunction, HoodieQuery, HoodieTableChanges, HoodieTimelineTableValuedFunction} object TableValuedFunctions { @@ -43,6 +43,11 @@ object TableValuedFunctions { FunctionIdentifier(HoodieFileSystemViewTableValuedFunction.FUNC_NAME), new ExpressionInfo(HoodieFileSystemViewTableValuedFunction.getClass.getCanonicalName, HoodieFileSystemViewTableValuedFunction.FUNC_NAME), (args: Seq[Expression]) => new HoodieFileSystemViewTableValuedFunction(args) + ), + ( + FunctionIdentifier(HoodieMetadataTableValuedFunction.FUNC_NAME), + new ExpressionInfo(HoodieMetadataTableValuedFunction.getClass.getCanonicalName, HoodieMetadataTableValuedFunction.FUNC_NAME), + (args: Seq[Expression]) => new HoodieMetadataTableValuedFunction(args) ) ) }