This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 12c26345f7c [HUDI-7261] TVF to query hudi table's filesystem state 
through spark-sql (#10414)
12c26345f7c is described below

commit 12c26345f7c92b27592fc2f52b433b8b0b37e9a7
Author: bhat-vinay <152183592+bhat-vi...@users.noreply.github.com>
AuthorDate: Wed Jan 3 12:34:47 2024 +0530

    [HUDI-7261] TVF to query hudi table's filesystem state through spark-sql 
(#10414)
    
    A new TVF, `hudi_filesystem_view(...)` is added to support querying 
timeline through spark-sql.
    The information displayed is similar to the 'fsview' command of hudi-cli.
    
    Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com>
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  15 +++
 .../main/scala/org/apache/hudi/DefaultSource.scala |   5 +
 .../scala/org/apache/hudi/FileSystemRelation.scala | 137 +++++++++++++++++++++
 .../sql/hudi/TestHoodieTableValuedFunction.scala   |  63 ++++++++++
 .../HoodieFileSystemViewTableValuedFunction.scala  |  64 ++++++++++
 .../hudi/analysis/HoodieSpark32PlusAnalysis.scala  |  17 ++-
 .../sql/hudi/analysis/TableValuedFunctions.scala   |   7 +-
 7 files changed, 306 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index b99db9725e2..828e83fd106 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -226,6 +226,21 @@ object DataSourceReadOptions {
       .withDocumentation("When this is set, the result set of the table valued 
function hudi_query_timeline(...)" +
         " will include archived timeline")
 
+  val CREATE_FILESYSTEM_RELATION: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.read.create.filesystem.relation")
+    .defaultValue("false")
+    .markAdvanced()
+    .sinceVersion("1.0.0")
+    .withDocumentation("When this is set, the relation created by 
DefaultSource is for a view representing" +
+      " the result set of the table valued function hudi_filesystem_view(...)")
+
+  val FILESYSTEM_RELATION_ARG_SUBPATH:  ConfigProperty[String] =
+    
ConfigProperty.key("hoodie.datasource.read.table.valued.function.filesystem.relation.subpath")
+      .defaultValue("")
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("A regex under the table's base path to get file 
system view information")
+
   /** @deprecated Use {@link QUERY_TYPE} and its methods instead */
   @Deprecated
   val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 87d527fdfe2..601c5e6f526 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -224,9 +224,14 @@ object DefaultSource {
       
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
     val isMultipleBaseFileFormatsEnabled = 
metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled
 
+
     val createTimeLineRln = 
parameters.get(DataSourceReadOptions.CREATE_TIMELINE_RELATION.key())
+    val createFSRln = 
parameters.get(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key())
+
     if (createTimeLineRln.isDefined) {
       new TimelineRelation(sqlContext, parameters, metaClient)
+    } else if (createFSRln.isDefined) {
+      new FileSystemRelation(sqlContext, parameters, metaClient)
     } else {
       log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: 
$tableType, queryType is: $queryType")
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FileSystemRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FileSystemRelation.scala
new file mode 100644
index 00000000000..90e6919e4fa
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FileSystemRelation.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieFileGroup, HoodieLogFile}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.{Row, SQLContext}
+import org.slf4j.LoggerFactory
+
+import java.util.function.{Consumer, Predicate, ToLongFunction}
+import scala.collection.JavaConversions
+
+/**
+ * Relation to implement the Hoodie's file-system view for the table
+ * valued function hudi_filesystem_view(...).
+ *
+ * The relation implements a simple buildScan() routine and does not support
+ * any filtering primitives. Any column or predicate filtering needs to be done
+ * explicitly by the execution layer.
+ *
+ */
+class FileSystemRelation(val sqlContext: SQLContext,
+                         val optParams: Map[String, String],
+                         val metaClient: HoodieTableMetaClient) extends 
BaseRelation with TableScan {
+
+  private val log = LoggerFactory.getLogger(classOf[FileSystemRelation])
+
+  // The schema for the FileSystemRelation view
+  override def schema: StructType = StructType(Array(
+    StructField("File_ID", StringType, nullable = true),
+    StructField("Partition_Path", StringType, nullable = true),
+    StructField("Base_Instant_Time", StringType, nullable = true),
+    StructField("Base_File_Path", StringType, nullable = true),
+    StructField("Base_File_Size", LongType, nullable = true),
+    StructField("Log_File_Count", LongType, nullable = true),
+    StructField("Log_File_Size", LongType, nullable = true),
+    StructField("Log_File_Scheduled", LongType, nullable = true),
+    StructField("Log_File_Unscheduled", LongType, nullable = true)
+  ))
+
+  // The buildScan(...) method implementation from TableScan
+  // This builds the dataframe containing all the columns for the 
FileSystemView
+  override def buildScan(): RDD[Row] = {
+    val data = collection.mutable.ArrayBuffer[Row]()
+    val subPath = 
optParams.getOrElse(DataSourceReadOptions.FILESYSTEM_RELATION_ARG_SUBPATH.key(),
 "")
+    val path = String.format("%s/%s/*", metaClient.getBasePathV2, subPath)
+    val fileStatusList = 
FSUtils.getGlobStatusExcludingMetaFolder(metaClient.getFs, new Path(path))
+
+
+    val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getWriteTimeline, fileStatusList.toArray(new 
Array[FileStatus](0)))
+    val fileGroups = fsView.getAllFileGroups
+
+    fileGroups.forEach(toJavaConsumer((fg: HoodieFileGroup) => {
+        fg.getAllFileSlices.forEach(toJavaConsumer((fs: FileSlice) => {
+        val logFileSize = fs.getLogFiles.mapToLong(toJavaLongFunction((lf: 
HoodieLogFile) => {
+          lf.getFileSize
+        })).sum()
+
+        val logFileCompactionSize = fs.getLogFiles.filter(toJavaPredicate((lf: 
HoodieLogFile) => {
+          lf.getDeltaCommitTime == fs.getBaseInstantTime
+        })).mapToLong(toJavaLongFunction((lf: HoodieLogFile) => {
+          lf.getFileSize
+        })).sum()
+
+        val logFileNonCompactionSize = 
fs.getLogFiles.filter(toJavaPredicate((lf: HoodieLogFile) => {
+          lf.getDeltaCommitTime != fs.getBaseInstantTime
+        })).mapToLong(toJavaLongFunction((lf: HoodieLogFile) => {
+          lf.getFileSize
+        })).sum()
+
+
+        val r = Row(
+          fg.getFileGroupId.getFileId,
+          fg.getPartitionPath,
+          fs.getBaseInstantTime,
+          if (fs.getBaseFile.isPresent) fs.getBaseFile.get.getPath else "",
+          if (fs.getBaseFile.isPresent) fs.getBaseFile.get.getFileSize else -1,
+          fs.getLogFiles.count,
+          logFileSize,
+          logFileCompactionSize,
+          logFileNonCompactionSize
+        )
+        data += r
+      }))
+    }))
+
+    // Using deprecated `JavaConversions` to be compatible with scala versions 
< 2.12.
+    // Can replace with JavaConverters.seqAsJavaList(...) once the support for 
scala versions < 2.12 is stopped
+    sqlContext.createDataFrame(JavaConversions.seqAsJavaList(data), schema).rdd
+  }
+
+  private def toJavaConsumer[T](consumer: (T) => Unit): Consumer[T] = {
+    new Consumer[T] {
+      override def accept(t: T): Unit = {
+        consumer(t)
+      }
+    }
+  }
+
+  private def toJavaLongFunction[T](apply: (T) => Long): ToLongFunction[T] = {
+    new ToLongFunction[T] {
+      override def applyAsLong(t: T): Long = {
+        apply(t)
+      }
+    }
+  }
+
+  private def toJavaPredicate[T](tst: (T) => Boolean): Predicate[T] = {
+    new Predicate[T] {
+      override def test(t: T): Boolean = {
+        tst(t)
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
index d08a496775e..867e83c301e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
@@ -192,6 +192,69 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
     }
   }
 
+  test(s"Test hudi_filesystem_view") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir { tmp =>
+        Seq(
+          ("cow", true),
+          ("mor", true),
+          ("cow", false),
+          ("mor", false)
+        ).foreach { parameters =>
+          val tableType = parameters._1
+          val isTableId = parameters._2
+
+          val tableName = generateTableName
+          val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+          val identifier = if (isTableId) tableName else tablePath
+          spark.sql("set hoodie.sql.insert.mode = non-strict")
+
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double
+               |) using hudi
+               |partitioned by (price)
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id'
+               |)
+               |location '$tablePath'
+               |""".stripMargin
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1', 10.0), (2, 'a2', 20.0), (3, 'a3', 30.0)
+               | """.stripMargin
+          )
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (4, 'a4', 10.0), (5, 'a5', 20.0), (6, 'a6', 30.0)
+               | """.stripMargin
+          )
+          val result1DF = spark.sql(s"select * from 
hudi_filesystem_view('$identifier', 'price*')")
+          result1DF.show(false)
+          val result1Array = result1DF.select(
+              col("Partition_Path")
+            ).orderBy("Partition_Path").take(10)
+          checkAnswer(result1Array)(
+            Seq("price=10.0"),
+            Seq("price=10.0"),
+            Seq("price=20.0"),
+            Seq("price=20.0"),
+            Seq("price=30.0"),
+            Seq("price=30.0")
+          )
+        }
+      }
+    }
+  }
+
   test(s"Test hudi_table_changes cdc") {
     if (HoodieSparkUtils.gteqSpark3_2) {
       withTempDir { tmp =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieFileSystemViewTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieFileSystemViewTableValuedFunction.scala
new file mode 100644
index 00000000000..c926dc0978d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieFileSystemViewTableValuedFunction.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logcal
+
+import org.apache.hudi.DataSourceReadOptions
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+object HoodieFileSystemViewTableValuedFunctionOptionsParser {
+  def parseOptions(exprs: Seq[Expression], funcName: String): (String, 
Map[String, String]) = {
+    val args = exprs.map(_.eval().toString)
+
+    if (args.size < 1 || args.size > 2) {
+      throw new AnalysisException(s"Expect arguments (table_name or 
table_path) for function `$funcName`")
+    }
+
+    val identifier = args.head
+    val subPathOpts = if (args.size == 2) {
+      Map(DataSourceReadOptions.FILESYSTEM_RELATION_ARG_SUBPATH.key() -> 
args(1))
+    } else {
+      Map.empty[String, String]
+    }
+    (identifier, Map(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key() -> 
"true") ++ subPathOpts)
+  }
+}
+
+
+case class HoodieFileSystemViewTableValuedFunction(args: Seq[Expression]) 
extends LeafNode {
+
+  override def output: Seq[Attribute] = Nil
+
+  override lazy val resolved: Boolean = false
+
+}
+
+object HoodieFileSystemViewTableValuedFunction {
+
+  val FUNC_NAME = "hudi_filesystem_view";
+
+}
+
+case class HoodieFileSystemViewTableValuedFunctionByPath(args: 
Seq[Expression]) extends LeafNode {
+
+  override def output: Seq[Attribute] = Nil
+
+  override lazy val resolved: Boolean = false
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index cedc6e6a9d3..bc8edc72295 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionBy
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, 
UnresolvedPartitionSpec}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, 
HoodieTableChanges, HoodieTableChangesOptionsParser, 
HoodieTimelineTableValuedFunction, 
HoodieTimelineTableValuedFunctionOptionsParser}
+import 
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
 HoodieFileSystemViewTableValuedFunctionOptionsParser, HoodieQuery, 
HoodieTableChanges, HoodieTableChangesOptionsParser, 
HoodieTimelineTableValuedFunction, 
HoodieTimelineTableValuedFunctionOptionsParser}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.Origin
@@ -119,6 +119,21 @@ case class HoodieSpark32PlusResolveReferences(spark: 
SparkSession) extends Rule[
           catalogTable.location.toString))
         LogicalRelation(relation, catalogTable)
       }
+    case HoodieFileSystemViewTableValuedFunction(args) =>
+      val (tablePath, opts) = 
HoodieFileSystemViewTableValuedFunctionOptionsParser.parseOptions(args, 
HoodieFileSystemViewTableValuedFunction.FUNC_NAME)
+      val hoodieDataSource = new DefaultSource
+      if (tablePath.contains(Path.SEPARATOR)) {
+        // the first param is table path
+        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts 
++ Map("path" -> tablePath))
+        LogicalRelation(relation)
+      } else {
+        // the first param is table identifier
+        val tableId = 
spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
+        val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
+        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts 
++ Map("path" ->
+          catalogTable.location.toString))
+        LogicalRelation(relation, catalogTable)
+      }
     case mO@MatchMergeIntoTable(targetTableO, sourceTableO, _)
       // START: custom Hudi change: don't want to go to the spark mit 
resolution so we resolve the source and target
       // if they haven't been
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
index bc76360cd17..b5f71389fab 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis
 
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, 
HoodieTableChanges, HoodieTimelineTableValuedFunction}
+import 
org.apache.spark.sql.catalyst.plans.logcal.{HoodieFileSystemViewTableValuedFunction,
 HoodieQuery, HoodieTableChanges, HoodieTimelineTableValuedFunction}
 
 object TableValuedFunctions {
 
@@ -38,6 +38,11 @@ object TableValuedFunctions {
       FunctionIdentifier(HoodieTimelineTableValuedFunction.FUNC_NAME),
       new 
ExpressionInfo(HoodieTimelineTableValuedFunction.getClass.getCanonicalName, 
HoodieTimelineTableValuedFunction.FUNC_NAME),
       (args: Seq[Expression]) => new HoodieTimelineTableValuedFunction(args)
+    ),
+    (
+      FunctionIdentifier(HoodieFileSystemViewTableValuedFunction.FUNC_NAME),
+      new 
ExpressionInfo(HoodieFileSystemViewTableValuedFunction.getClass.getCanonicalName,
 HoodieFileSystemViewTableValuedFunction.FUNC_NAME),
+      (args: Seq[Expression]) => new 
HoodieFileSystemViewTableValuedFunction(args)
     )
   )
 }

Reply via email to