This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 12c26345f7c [HUDI-7261] TVF to query hudi table's filesystem state through spark-sql (#10414) 12c26345f7c is described below commit 12c26345f7c92b27592fc2f52b433b8b0b37e9a7 Author: bhat-vinay <152183592+bhat-vi...@users.noreply.github.com> AuthorDate: Wed Jan 3 12:34:47 2024 +0530 [HUDI-7261] TVF to query hudi table's filesystem state through spark-sql (#10414) A new TVF, `hudi_filesystem_view(...)` is added to support querying timeline through spark-sql. The information displayed is similar to the 'fsview' command of hudi-cli. Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com> --- .../scala/org/apache/hudi/DataSourceOptions.scala | 15 +++ .../main/scala/org/apache/hudi/DefaultSource.scala | 5 + .../scala/org/apache/hudi/FileSystemRelation.scala | 137 +++++++++++++++++++++ .../sql/hudi/TestHoodieTableValuedFunction.scala | 63 ++++++++++ .../HoodieFileSystemViewTableValuedFunction.scala | 64 ++++++++++ .../hudi/analysis/HoodieSpark32PlusAnalysis.scala | 17 ++- .../sql/hudi/analysis/TableValuedFunctions.scala | 7 +- 7 files changed, 306 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index b99db9725e2..828e83fd106 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -226,6 +226,21 @@ object DataSourceReadOptions { .withDocumentation("When this is set, the result set of the table valued function hudi_query_timeline(...)" + " will include archived timeline") + val CREATE_FILESYSTEM_RELATION: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.read.create.filesystem.relation") + .defaultValue("false") + .markAdvanced() + .sinceVersion("1.0.0") + .withDocumentation("When this is set, the relation created by DefaultSource is for a view representing" + + " the result set of the table valued function hudi_filesystem_view(...)") + + val FILESYSTEM_RELATION_ARG_SUBPATH: ConfigProperty[String] = + ConfigProperty.key("hoodie.datasource.read.table.valued.function.filesystem.relation.subpath") + .defaultValue("") + .markAdvanced() + .sinceVersion("1.0.0") + .withDocumentation("A regex under the table's base path to get file system view information") + /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 87d527fdfe2..601c5e6f526 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -224,9 +224,14 @@ object DefaultSource { parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL) val isMultipleBaseFileFormatsEnabled = metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled + val createTimeLineRln = parameters.get(DataSourceReadOptions.CREATE_TIMELINE_RELATION.key()) + val createFSRln = parameters.get(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key()) + if (createTimeLineRln.isDefined) { new TimelineRelation(sqlContext, parameters, metaClient) + } else if (createFSRln.isDefined) { + new FileSystemRelation(sqlContext, parameters, metaClient) } else { log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FileSystemRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FileSystemRelation.scala new file mode 100644 index 00000000000..90e6919e4fa --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FileSystemRelation.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{FileSlice, HoodieFileGroup, HoodieLogFile} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.apache.spark.sql.{Row, SQLContext} +import org.slf4j.LoggerFactory + +import java.util.function.{Consumer, Predicate, ToLongFunction} +import scala.collection.JavaConversions + +/** + * Relation to implement the Hoodie's file-system view for the table + * valued function hudi_filesystem_view(...). + * + * The relation implements a simple buildScan() routine and does not support + * any filtering primitives. Any column or predicate filtering needs to be done + * explicitly by the execution layer. + * + */ +class FileSystemRelation(val sqlContext: SQLContext, + val optParams: Map[String, String], + val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan { + + private val log = LoggerFactory.getLogger(classOf[FileSystemRelation]) + + // The schema for the FileSystemRelation view + override def schema: StructType = StructType(Array( + StructField("File_ID", StringType, nullable = true), + StructField("Partition_Path", StringType, nullable = true), + StructField("Base_Instant_Time", StringType, nullable = true), + StructField("Base_File_Path", StringType, nullable = true), + StructField("Base_File_Size", LongType, nullable = true), + StructField("Log_File_Count", LongType, nullable = true), + StructField("Log_File_Size", LongType, nullable = true), + StructField("Log_File_Scheduled", LongType, nullable = true), + StructField("Log_File_Unscheduled", LongType, nullable = true) + )) + + // The buildScan(...) method implementation from TableScan + // This builds the dataframe containing all the columns for the FileSystemView + override def buildScan(): RDD[Row] = { + val data = collection.mutable.ArrayBuffer[Row]() + val subPath = optParams.getOrElse(DataSourceReadOptions.FILESYSTEM_RELATION_ARG_SUBPATH.key(), "") + val path = String.format("%s/%s/*", metaClient.getBasePathV2, subPath) + val fileStatusList = FSUtils.getGlobStatusExcludingMetaFolder(metaClient.getFs, new Path(path)) + + + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getWriteTimeline, fileStatusList.toArray(new Array[FileStatus](0))) + val fileGroups = fsView.getAllFileGroups + + fileGroups.forEach(toJavaConsumer((fg: HoodieFileGroup) => { + fg.getAllFileSlices.forEach(toJavaConsumer((fs: FileSlice) => { + val logFileSize = fs.getLogFiles.mapToLong(toJavaLongFunction((lf: HoodieLogFile) => { + lf.getFileSize + })).sum() + + val logFileCompactionSize = fs.getLogFiles.filter(toJavaPredicate((lf: HoodieLogFile) => { + lf.getDeltaCommitTime == fs.getBaseInstantTime + })).mapToLong(toJavaLongFunction((lf: HoodieLogFile) => { + lf.getFileSize + })).sum() + + val logFileNonCompactionSize = fs.getLogFiles.filter(toJavaPredicate((lf: HoodieLogFile) => { + lf.getDeltaCommitTime != fs.getBaseInstantTime + })).mapToLong(toJavaLongFunction((lf: HoodieLogFile) => { + lf.getFileSize + })).sum() + + + val r = Row( + fg.getFileGroupId.getFileId, + fg.getPartitionPath, + fs.getBaseInstantTime, + if (fs.getBaseFile.isPresent) fs.getBaseFile.get.getPath else "", + if (fs.getBaseFile.isPresent) fs.getBaseFile.get.getFileSize else -1, + fs.getLogFiles.count, + logFileSize, + logFileCompactionSize, + logFileNonCompactionSize + ) + data += r + })) + })) + + // Using deprecated `JavaConversions` to be compatible with scala versions < 2.12. + // Can replace with JavaConverters.seqAsJavaList(...) once the support for scala versions < 2.12 is stopped + sqlContext.createDataFrame(JavaConversions.seqAsJavaList(data), schema).rdd + } + + private def toJavaConsumer[T](consumer: (T) => Unit): Consumer[T] = { + new Consumer[T] { + override def accept(t: T): Unit = { + consumer(t) + } + } + } + + private def toJavaLongFunction[T](apply: (T) => Long): ToLongFunction[T] = { + new ToLongFunction[T] { + override def applyAsLong(t: T): Long = { + apply(t) + } + } + } + + private def toJavaPredicate[T](tst: (T) => Boolean): Predicate[T] = { + new Predicate[T] { + override def test(t: T): Boolean = { + tst(t) + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala index d08a496775e..867e83c301e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala @@ -192,6 +192,69 @@ class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase { } } + test(s"Test hudi_filesystem_view") { + if (HoodieSparkUtils.gteqSpark3_2) { + withTempDir { tmp => + Seq( + ("cow", true), + ("mor", true), + ("cow", false), + ("mor", false) + ).foreach { parameters => + val tableType = parameters._1 + val isTableId = parameters._2 + + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + val identifier = if (isTableId) tableName else tablePath + spark.sql("set hoodie.sql.insert.mode = non-strict") + + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double + |) using hudi + |partitioned by (price) + |tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + |) + |location '$tablePath' + |""".stripMargin + ) + + spark.sql( + s""" + | insert into $tableName + | values (1, 'a1', 10.0), (2, 'a2', 20.0), (3, 'a3', 30.0) + | """.stripMargin + ) + spark.sql( + s""" + | insert into $tableName + | values (4, 'a4', 10.0), (5, 'a5', 20.0), (6, 'a6', 30.0) + | """.stripMargin + ) + val result1DF = spark.sql(s"select * from hudi_filesystem_view('$identifier', 'price*')") + result1DF.show(false) + val result1Array = result1DF.select( + col("Partition_Path") + ).orderBy("Partition_Path").take(10) + checkAnswer(result1Array)( + Seq("price=10.0"), + Seq("price=10.0"), + Seq("price=20.0"), + Seq("price=20.0"), + Seq("price=30.0"), + Seq("price=30.0") + ) + } + } + } + } + test(s"Test hudi_table_changes cdc") { if (HoodieSparkUtils.gteqSpark3_2) { withTempDir { tmp => diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieFileSystemViewTableValuedFunction.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieFileSystemViewTableValuedFunction.scala new file mode 100644 index 00000000000..c926dc0978d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieFileSystemViewTableValuedFunction.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logcal + +import org.apache.hudi.DataSourceReadOptions +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LeafNode + +object HoodieFileSystemViewTableValuedFunctionOptionsParser { + def parseOptions(exprs: Seq[Expression], funcName: String): (String, Map[String, String]) = { + val args = exprs.map(_.eval().toString) + + if (args.size < 1 || args.size > 2) { + throw new AnalysisException(s"Expect arguments (table_name or table_path) for function `$funcName`") + } + + val identifier = args.head + val subPathOpts = if (args.size == 2) { + Map(DataSourceReadOptions.FILESYSTEM_RELATION_ARG_SUBPATH.key() -> args(1)) + } else { + Map.empty[String, String] + } + (identifier, Map(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key() -> "true") ++ subPathOpts) + } +} + + +case class HoodieFileSystemViewTableValuedFunction(args: Seq[Expression]) extends LeafNode { + + override def output: Seq[Attribute] = Nil + + override lazy val resolved: Boolean = false + +} + +object HoodieFileSystemViewTableValuedFunction { + + val FUNC_NAME = "hudi_filesystem_view"; + +} + +case class HoodieFileSystemViewTableValuedFunctionByPath(args: Seq[Expression]) extends LeafNode { + + override def output: Seq[Attribute] = Nil + + override lazy val resolved: Boolean = false + +} diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala index cedc6e6a9d3..bc8edc72295 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionBy import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, UnresolvedPartitionSpec} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, HoodieTableChanges, HoodieTableChangesOptionsParser, HoodieTimelineTableValuedFunction, HoodieTimelineTableValuedFunctionOptionsParser} +import org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction, HoodieFileSystemViewTableValuedFunctionOptionsParser, HoodieQuery, HoodieTableChanges, HoodieTableChangesOptionsParser, HoodieTimelineTableValuedFunction, HoodieTimelineTableValuedFunctionOptionsParser} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.Origin @@ -119,6 +119,21 @@ case class HoodieSpark32PlusResolveReferences(spark: SparkSession) extends Rule[ catalogTable.location.toString)) LogicalRelation(relation, catalogTable) } + case HoodieFileSystemViewTableValuedFunction(args) => + val (tablePath, opts) = HoodieFileSystemViewTableValuedFunctionOptionsParser.parseOptions(args, HoodieFileSystemViewTableValuedFunction.FUNC_NAME) + val hoodieDataSource = new DefaultSource + if (tablePath.contains(Path.SEPARATOR)) { + // the first param is table path + val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> tablePath)) + LogicalRelation(relation) + } else { + // the first param is table identifier + val tableId = spark.sessionState.sqlParser.parseTableIdentifier(tablePath) + val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId) + val relation = hoodieDataSource.createRelation(spark.sqlContext, opts ++ Map("path" -> + catalogTable.location.toString)) + LogicalRelation(relation, catalogTable) + } case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _) // START: custom Hudi change: don't want to go to the spark mit resolution so we resolve the source and target // if they haven't been diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala index bc76360cd17..b5f71389fab 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} -import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, HoodieTableChanges, HoodieTimelineTableValuedFunction} +import org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction, HoodieQuery, HoodieTableChanges, HoodieTimelineTableValuedFunction} object TableValuedFunctions { @@ -38,6 +38,11 @@ object TableValuedFunctions { FunctionIdentifier(HoodieTimelineTableValuedFunction.FUNC_NAME), new ExpressionInfo(HoodieTimelineTableValuedFunction.getClass.getCanonicalName, HoodieTimelineTableValuedFunction.FUNC_NAME), (args: Seq[Expression]) => new HoodieTimelineTableValuedFunction(args) + ), + ( + FunctionIdentifier(HoodieFileSystemViewTableValuedFunction.FUNC_NAME), + new ExpressionInfo(HoodieFileSystemViewTableValuedFunction.getClass.getCanonicalName, HoodieFileSystemViewTableValuedFunction.FUNC_NAME), + (args: Seq[Expression]) => new HoodieFileSystemViewTableValuedFunction(args) ) ) }