This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb017d396 [HUDI-3508] Add call procedure for FileSystemViewCommand 
(#5929)
1bb017d396 is described below

commit 1bb017d396562b891065187bac417c52edea24da
Author: jiz <[email protected]>
AuthorDate: Wed Jun 22 17:50:20 2022 +0800

    [HUDI-3508] Add call procedure for FileSystemViewCommand (#5929)
    
    * [HUDI-3508] Add call procedure for FileSystemView
    
    * minor
    
    Co-authored-by: jiimmyzhan <[email protected]>
---
 .../hudi/command/procedures/HoodieProcedures.scala |   2 +
 .../procedures/ShowFileSystemViewProcedure.scala   | 258 +++++++++++++++++++++
 .../sql/hudi/procedure/TestFsViewProcedure.scala   |  95 ++++++++
 3 files changed, 355 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index ff129964fa..d974e216d5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -48,6 +48,8 @@ object HoodieProcedures {
     mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
     mapBuilder.put(ShowRollbackDetailProcedure.NAME, 
ShowRollbackDetailProcedure.builder)
     mapBuilder.put(ExportInstantsProcedure.NAME, 
ExportInstantsProcedure.builder)
+    mapBuilder.put(ShowAllFileSystemViewProcedure.NAME, 
ShowAllFileSystemViewProcedure.builder)
+    mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, 
ShowLatestFileSystemViewProcedure.builder)
     mapBuilder.build
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
new file mode 100644
index 0000000000..8c861cf0f6
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import com.google.common.collect.Lists
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, 
HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.{Function, Supplier}
+import java.util.stream.Collectors
+import scala.collection.JavaConverters.{asJavaIteratorConverter, 
asScalaIteratorConverter}
+
+class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure 
with ProcedureBuilder {
+  private val PARAMETERS_ALL: Array[ProcedureParameter] = 
Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
+    ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "exclude_compaction", 
DataTypes.BooleanType, false),
+    ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
+    ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*")
+  )
+
+  private val OUTPUT_TYPE_ALL: StructType = StructType(Array[StructField](
+    StructField("partition", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("file_id", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("base_instant", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("data_file", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("data_file_size", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("num_delta_files", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("total_delta_file_size", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("delta_files", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  private val PARAMETERS_LATEST: Array[ProcedureParameter] = 
Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
+    ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, 
false),
+    ProcedureParameter.optional(4, "exclude_compaction", 
DataTypes.BooleanType, false),
+    ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
+    ProcedureParameter.required(6, "partition_path", DataTypes.StringType, 
None),
+    ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true)
+
+  )
+
+  private val OUTPUT_TYPE_LATEST: StructType = StructType(Array[StructField](
+    StructField("partition", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("file_id", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("base_instant", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("data_file", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("data_file_size", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("num_delta_files", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("total_delta_file_size", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("delta_size_compaction_scheduled", DataTypes.LongType, 
nullable = true, Metadata.empty),
+    StructField("delta_size_compaction_unscheduled", DataTypes.LongType, 
nullable = true, Metadata.empty),
+    StructField("delta_to_base_radio_compaction_scheduled", 
DataTypes.DoubleType, nullable = true, Metadata.empty),
+    StructField("delta_to_base_radio_compaction_unscheduled", 
DataTypes.DoubleType, nullable = true, Metadata.empty),
+    StructField("delta_files_compaction_scheduled", DataTypes.StringType, 
nullable = true, Metadata.empty),
+    StructField("delta_files_compaction_unscheduled", DataTypes.StringType, 
nullable = true, Metadata.empty)
+  ))
+
+  private def buildFileSystemView(table: Option[Any],
+                                  globRegex: String,
+                                  maxInstant: String,
+                                  includeMaxInstant: Boolean,
+                                  includeInflight: Boolean,
+                                  excludeCompaction: Boolean
+                                 ): HoodieTableFileSystemView = {
+    val basePath = getBasePath(table)
+    val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val fs = metaClient.getFs
+    val globPath = String.format("%s/%s/*", basePath, globRegex)
+    val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new 
Path(globPath))
+    var timeline: HoodieTimeline = if (excludeCompaction) {
+      metaClient.getActiveTimeline.getCommitsTimeline
+    } else {
+      metaClient.getActiveTimeline.getWriteTimeline
+    }
+    if (!includeInflight) {
+      timeline = timeline.filterCompletedInstants()
+    }
+    var instants = timeline.getInstants.iterator().asScala
+    if (maxInstant.nonEmpty) {
+      val predicate = if (includeMaxInstant) {
+        HoodieTimeline.GREATER_THAN_OR_EQUALS
+      } else {
+        HoodieTimeline.GREATER_THAN
+      }
+      instants = instants.filter(instant => predicate.test(maxInstant, 
instant.getTimestamp))
+    }
+
+    val details = new Function[HoodieInstant, 
org.apache.hudi.common.util.Option[Array[Byte]]]
+      with java.io.Serializable {
+      override def apply(instant: HoodieInstant): util.Option[Array[Byte]] = {
+        metaClient.getActiveTimeline.getInstantDetails(instant)
+      }
+    }
+    val filteredTimeline = new 
HoodieDefaultTimeline(Lists.newArrayList(instants.asJava).stream(), details)
+    new HoodieTableFileSystemView(metaClient, filteredTimeline, 
statuses.toArray(new Array[FileStatus](0)))
+  }
+
+  private def showAllFileSlices(fsView: HoodieTableFileSystemView): 
java.util.List[Row] = {
+    val rows: java.util.List[Row] = Lists.newArrayList()
+    fsView.getAllFileGroups.iterator().asScala.foreach(fg => {
+      fg.getAllFileSlices.iterator().asScala.foreach(fs => {
+        val fileId = fg.getFileGroupId.getFileId
+        var baseFilePath = ""
+        var baseFileSize = 0L
+        if (fs.getBaseFile.isPresent) {
+          baseFilePath = fs.getBaseFile.get.getPath
+          baseFileSize = fs.getBaseFile.get.getFileSize
+        }
+        val numLogFiles = fs.getLogFiles.count()
+        val sumLogFileSize = 
fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum
+        val logFiles = 
fs.getLogFiles.collect(Collectors.toList[HoodieLogFile]).toString
+
+        rows.add(Row(fg.getPartitionPath, fileId, fs.getBaseInstantTime, 
baseFilePath, baseFileSize, numLogFiles,
+          sumLogFileSize, logFiles))
+      })
+    })
+    rows
+  }
+
+  private def showLatestFileSlices(fsView: HoodieTableFileSystemView,
+                                   table: Option[Any],
+                                   partition: String,
+                                   maxInstant: String,
+                                   merge: Boolean): java.util.List[Row] = {
+    var fileSliceStream: java.util.stream.Stream[FileSlice] = null
+    if (!merge) {
+      fileSliceStream = fsView.getLatestFileSlices(partition)
+    } else {
+      fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, 
if (maxInstant.isEmpty) {
+        val basePath = getBasePath(table)
+        val metaClient = 
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+        
metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp
+      } else {
+        maxInstant
+      })
+    }
+    val rows: java.util.List[Row] = Lists.newArrayList()
+    fileSliceStream.iterator().asScala.foreach {
+      fs => {
+        val fileId = fs.getFileId
+        val baseInstantTime = fs.getBaseInstantTime
+        var baseFilePath = ""
+        var baseFileSize = 0L
+        if (fs.getBaseFile.isPresent) {
+          baseFilePath = fs.getBaseFile.get.getPath
+          baseFileSize = fs.getBaseFile.get.getFileSize
+        }
+        val numLogFiles = fs.getLogFiles.count()
+        val sumLogFileSize = 
fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum
+        val logFilesScheduledForCompactionTotalSize = 
fs.getLogFiles.iterator().asScala
+          .filter(logFile => 
logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+          .map(_.getFileSize).sum
+        val logFilesUnscheduledTotalSize = fs.getLogFiles.iterator().asScala
+          .filter(logFile => 
!logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+          .map(_.getFileSize).sum
+        val logSelectedForCompactionToBaseRatio = if (baseFileSize > 0) {
+          logFilesScheduledForCompactionTotalSize / (baseFileSize * 1.0)
+        } else {
+          -1
+        }
+        val logUnscheduledToBaseRatio = if (baseFileSize > 0) {
+          logFilesUnscheduledTotalSize / (baseFileSize * 1.0)
+        } else {
+          -1
+        }
+        val logFilesCommitTimeEqualInstantTime = 
fs.getLogFiles.iterator().asScala
+          .filter(logFile => 
logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+          .mkString("[", ",", "]")
+        val logFilesCommitTimeNonEqualInstantTime = 
fs.getLogFiles.iterator().asScala
+          .filter(logFile => 
!logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+          .mkString("[", ",", "]")
+        rows.add(Row(partition, fileId, baseInstantTime, baseFilePath, 
baseFileSize, numLogFiles, sumLogFileSize,
+          logFilesScheduledForCompactionTotalSize, 
logFilesUnscheduledTotalSize, logSelectedForCompactionToBaseRatio,
+          logUnscheduledToBaseRatio, logFilesCommitTimeEqualInstantTime, 
logFilesCommitTimeNonEqualInstantTime
+        ))
+      }
+    }
+    rows
+  }
+
+  override def parameters: Array[ProcedureParameter] = if (showLatest) {
+    PARAMETERS_LATEST
+  } else {
+    PARAMETERS_ALL
+  }
+
+  override def outputType: StructType = if (showLatest) {
+    OUTPUT_TYPE_LATEST
+  } else {
+    OUTPUT_TYPE_ALL
+  }
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(parameters, args)
+    val table = getArgValueOrDefault(args, parameters(0))
+    val maxInstant = getArgValueOrDefault(args, 
parameters(1)).get.asInstanceOf[String]
+    val includeMax = getArgValueOrDefault(args, 
parameters(2)).get.asInstanceOf[Boolean]
+    val includeInflight = getArgValueOrDefault(args, 
parameters(3)).get.asInstanceOf[Boolean]
+    val excludeCompaction = getArgValueOrDefault(args, 
parameters(4)).get.asInstanceOf[Boolean]
+    val limit = getArgValueOrDefault(args, parameters(5)).get.asInstanceOf[Int]
+    val rows: java.util.List[Row] = if (!showLatest) {
+      val globRegex = getArgValueOrDefault(args, 
parameters(6)).get.asInstanceOf[String]
+      val fsView = buildFileSystemView(table, globRegex, maxInstant, 
includeMax, includeInflight, excludeCompaction)
+      showAllFileSlices(fsView)
+    } else {
+      val partitionPath = getArgValueOrDefault(args, 
parameters(6)).get.asInstanceOf[String]
+      val merge = getArgValueOrDefault(args, 
parameters(7)).get.asInstanceOf[Boolean]
+      val fsView = buildFileSystemView(table, partitionPath, maxInstant, 
includeMax, includeInflight, excludeCompaction)
+      showLatestFileSlices(fsView, table, partitionPath, maxInstant, merge)
+    }
+    rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new ShowFileSystemViewProcedure(showLatest)
+}
+
+object ShowAllFileSystemViewProcedure {
+  val NAME = "show_fsview_all"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowFileSystemViewProcedure(false)
+  }
+}
+
+object ShowLatestFileSystemViewProcedure {
+  val NAME = "show_fsview_latest"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get(): ProcedureBuilder = new 
ShowFileSystemViewProcedure(true)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
new file mode 100644
index 0000000000..69d08e37df
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestFsViewProcedure extends HoodieSparkSqlTestBase {
+  test("Test Call show_fsview_all Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (ts)
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // Check required fields
+      checkExceptionContain(s"""call show_fsview_all(limit => 10)""")(
+        s"Argument: table is required")
+
+      // collect result for table
+      val result = spark.sql(
+        s"""call show_fsview_all(table => '$tableName', path_regex => '*/', 
limit => 10)""".stripMargin).collect()
+      assertResult(2) {
+        result.length
+      }
+    }
+  }
+
+  test("Test Call show_fsview_latest Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (ts)
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // Check required fields
+      checkExceptionContain(s"""call show_fsview_latest(limit => 10)""")(
+        s"Argument: table is required")
+
+      // collect result for table
+      val result = spark.sql(
+        s"""call show_fsview_latest(table => '$tableName', partition_path => 
'ts=1000', limit => 10)""".stripMargin).collect()
+      assertResult(1) {
+        result.length
+      }
+    }
+  }
+}

Reply via email to