This is an automated email from the ASF dual-hosted git repository.
mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb017d396 [HUDI-3508] Add call procedure for FileSystemViewCommand
(#5929)
1bb017d396 is described below
commit 1bb017d396562b891065187bac417c52edea24da
Author: jiz <[email protected]>
AuthorDate: Wed Jun 22 17:50:20 2022 +0800
[HUDI-3508] Add call procedure for FileSystemViewCommand (#5929)
* [HUDI-3508] Add call procedure for FileSystemView
* minor
Co-authored-by: jiimmyzhan <[email protected]>
---
.../hudi/command/procedures/HoodieProcedures.scala | 2 +
.../procedures/ShowFileSystemViewProcedure.scala | 258 +++++++++++++++++++++
.../sql/hudi/procedure/TestFsViewProcedure.scala | 95 ++++++++
3 files changed, 355 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index ff129964fa..d974e216d5 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -48,6 +48,8 @@ object HoodieProcedures {
mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
mapBuilder.put(ShowRollbackDetailProcedure.NAME,
ShowRollbackDetailProcedure.builder)
mapBuilder.put(ExportInstantsProcedure.NAME,
ExportInstantsProcedure.builder)
+ mapBuilder.put(ShowAllFileSystemViewProcedure.NAME,
ShowAllFileSystemViewProcedure.builder)
+ mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME,
ShowLatestFileSystemViewProcedure.builder)
mapBuilder.build
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
new file mode 100644
index 0000000000..8c861cf0f6
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import com.google.common.collect.Lists
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline,
HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.{Function, Supplier}
+import java.util.stream.Collectors
+import scala.collection.JavaConverters.{asJavaIteratorConverter,
asScalaIteratorConverter}
+
+class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure
with ProcedureBuilder {
+ private val PARAMETERS_ALL: Array[ProcedureParameter] =
Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
+ ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(4, "exclude_compaction",
DataTypes.BooleanType, false),
+ ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
+ ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*")
+ )
+
+ private val OUTPUT_TYPE_ALL: StructType = StructType(Array[StructField](
+ StructField("partition", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("file_id", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("base_instant", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("data_file", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("data_file_size", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("num_delta_files", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("total_delta_file_size", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("delta_files", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ private val PARAMETERS_LATEST: Array[ProcedureParameter] =
Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
+ ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(4, "exclude_compaction",
DataTypes.BooleanType, false),
+ ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
+ ProcedureParameter.required(6, "partition_path", DataTypes.StringType,
None),
+ ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true)
+
+ )
+
+ private val OUTPUT_TYPE_LATEST: StructType = StructType(Array[StructField](
+ StructField("partition", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("file_id", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("base_instant", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("data_file", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("data_file_size", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("num_delta_files", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("total_delta_file_size", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("delta_size_compaction_scheduled", DataTypes.LongType,
nullable = true, Metadata.empty),
+ StructField("delta_size_compaction_unscheduled", DataTypes.LongType,
nullable = true, Metadata.empty),
+ StructField("delta_to_base_radio_compaction_scheduled",
DataTypes.DoubleType, nullable = true, Metadata.empty),
+ StructField("delta_to_base_radio_compaction_unscheduled",
DataTypes.DoubleType, nullable = true, Metadata.empty),
+ StructField("delta_files_compaction_scheduled", DataTypes.StringType,
nullable = true, Metadata.empty),
+ StructField("delta_files_compaction_unscheduled", DataTypes.StringType,
nullable = true, Metadata.empty)
+ ))
+
+ private def buildFileSystemView(table: Option[Any],
+ globRegex: String,
+ maxInstant: String,
+ includeMaxInstant: Boolean,
+ includeInflight: Boolean,
+ excludeCompaction: Boolean
+ ): HoodieTableFileSystemView = {
+ val basePath = getBasePath(table)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val fs = metaClient.getFs
+ val globPath = String.format("%s/%s/*", basePath, globRegex)
+ val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new
Path(globPath))
+ var timeline: HoodieTimeline = if (excludeCompaction) {
+ metaClient.getActiveTimeline.getCommitsTimeline
+ } else {
+ metaClient.getActiveTimeline.getWriteTimeline
+ }
+ if (!includeInflight) {
+ timeline = timeline.filterCompletedInstants()
+ }
+ var instants = timeline.getInstants.iterator().asScala
+ if (maxInstant.nonEmpty) {
+ val predicate = if (includeMaxInstant) {
+ HoodieTimeline.GREATER_THAN_OR_EQUALS
+ } else {
+ HoodieTimeline.GREATER_THAN
+ }
+ instants = instants.filter(instant => predicate.test(maxInstant,
instant.getTimestamp))
+ }
+
+ val details = new Function[HoodieInstant,
org.apache.hudi.common.util.Option[Array[Byte]]]
+ with java.io.Serializable {
+ override def apply(instant: HoodieInstant): util.Option[Array[Byte]] = {
+ metaClient.getActiveTimeline.getInstantDetails(instant)
+ }
+ }
+ val filteredTimeline = new
HoodieDefaultTimeline(Lists.newArrayList(instants.asJava).stream(), details)
+ new HoodieTableFileSystemView(metaClient, filteredTimeline,
statuses.toArray(new Array[FileStatus](0)))
+ }
+
+ private def showAllFileSlices(fsView: HoodieTableFileSystemView):
java.util.List[Row] = {
+ val rows: java.util.List[Row] = Lists.newArrayList()
+ fsView.getAllFileGroups.iterator().asScala.foreach(fg => {
+ fg.getAllFileSlices.iterator().asScala.foreach(fs => {
+ val fileId = fg.getFileGroupId.getFileId
+ var baseFilePath = ""
+ var baseFileSize = 0L
+ if (fs.getBaseFile.isPresent) {
+ baseFilePath = fs.getBaseFile.get.getPath
+ baseFileSize = fs.getBaseFile.get.getFileSize
+ }
+ val numLogFiles = fs.getLogFiles.count()
+ val sumLogFileSize =
fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum
+ val logFiles =
fs.getLogFiles.collect(Collectors.toList[HoodieLogFile]).toString
+
+ rows.add(Row(fg.getPartitionPath, fileId, fs.getBaseInstantTime,
baseFilePath, baseFileSize, numLogFiles,
+ sumLogFileSize, logFiles))
+ })
+ })
+ rows
+ }
+
+ private def showLatestFileSlices(fsView: HoodieTableFileSystemView,
+ table: Option[Any],
+ partition: String,
+ maxInstant: String,
+ merge: Boolean): java.util.List[Row] = {
+ var fileSliceStream: java.util.stream.Stream[FileSlice] = null
+ if (!merge) {
+ fileSliceStream = fsView.getLatestFileSlices(partition)
+ } else {
+ fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition,
if (maxInstant.isEmpty) {
+ val basePath = getBasePath(table)
+ val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+
metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp
+ } else {
+ maxInstant
+ })
+ }
+ val rows: java.util.List[Row] = Lists.newArrayList()
+ fileSliceStream.iterator().asScala.foreach {
+ fs => {
+ val fileId = fs.getFileId
+ val baseInstantTime = fs.getBaseInstantTime
+ var baseFilePath = ""
+ var baseFileSize = 0L
+ if (fs.getBaseFile.isPresent) {
+ baseFilePath = fs.getBaseFile.get.getPath
+ baseFileSize = fs.getBaseFile.get.getFileSize
+ }
+ val numLogFiles = fs.getLogFiles.count()
+ val sumLogFileSize =
fs.getLogFiles.iterator().asScala.map(_.getFileSize).sum
+ val logFilesScheduledForCompactionTotalSize =
fs.getLogFiles.iterator().asScala
+ .filter(logFile =>
logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+ .map(_.getFileSize).sum
+ val logFilesUnscheduledTotalSize = fs.getLogFiles.iterator().asScala
+ .filter(logFile =>
!logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+ .map(_.getFileSize).sum
+ val logSelectedForCompactionToBaseRatio = if (baseFileSize > 0) {
+ logFilesScheduledForCompactionTotalSize / (baseFileSize * 1.0)
+ } else {
+ -1
+ }
+ val logUnscheduledToBaseRatio = if (baseFileSize > 0) {
+ logFilesUnscheduledTotalSize / (baseFileSize * 1.0)
+ } else {
+ -1
+ }
+ val logFilesCommitTimeEqualInstantTime =
fs.getLogFiles.iterator().asScala
+ .filter(logFile =>
logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+ .mkString("[", ",", "]")
+ val logFilesCommitTimeNonEqualInstantTime =
fs.getLogFiles.iterator().asScala
+ .filter(logFile =>
!logFile.getBaseCommitTime.equals(fs.getBaseInstantTime))
+ .mkString("[", ",", "]")
+ rows.add(Row(partition, fileId, baseInstantTime, baseFilePath,
baseFileSize, numLogFiles, sumLogFileSize,
+ logFilesScheduledForCompactionTotalSize,
logFilesUnscheduledTotalSize, logSelectedForCompactionToBaseRatio,
+ logUnscheduledToBaseRatio, logFilesCommitTimeEqualInstantTime,
logFilesCommitTimeNonEqualInstantTime
+ ))
+ }
+ }
+ rows
+ }
+
+ override def parameters: Array[ProcedureParameter] = if (showLatest) {
+ PARAMETERS_LATEST
+ } else {
+ PARAMETERS_ALL
+ }
+
+ override def outputType: StructType = if (showLatest) {
+ OUTPUT_TYPE_LATEST
+ } else {
+ OUTPUT_TYPE_ALL
+ }
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(parameters, args)
+ val table = getArgValueOrDefault(args, parameters(0))
+ val maxInstant = getArgValueOrDefault(args,
parameters(1)).get.asInstanceOf[String]
+ val includeMax = getArgValueOrDefault(args,
parameters(2)).get.asInstanceOf[Boolean]
+ val includeInflight = getArgValueOrDefault(args,
parameters(3)).get.asInstanceOf[Boolean]
+ val excludeCompaction = getArgValueOrDefault(args,
parameters(4)).get.asInstanceOf[Boolean]
+ val limit = getArgValueOrDefault(args, parameters(5)).get.asInstanceOf[Int]
+ val rows: java.util.List[Row] = if (!showLatest) {
+ val globRegex = getArgValueOrDefault(args,
parameters(6)).get.asInstanceOf[String]
+ val fsView = buildFileSystemView(table, globRegex, maxInstant,
includeMax, includeInflight, excludeCompaction)
+ showAllFileSlices(fsView)
+ } else {
+ val partitionPath = getArgValueOrDefault(args,
parameters(6)).get.asInstanceOf[String]
+ val merge = getArgValueOrDefault(args,
parameters(7)).get.asInstanceOf[Boolean]
+ val fsView = buildFileSystemView(table, partitionPath, maxInstant,
includeMax, includeInflight, excludeCompaction)
+ showLatestFileSlices(fsView, table, partitionPath, maxInstant, merge)
+ }
+ rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new ShowFileSystemViewProcedure(showLatest)
+}
+
+object ShowAllFileSystemViewProcedure {
+ val NAME = "show_fsview_all"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ShowFileSystemViewProcedure(false)
+ }
+}
+
+object ShowLatestFileSystemViewProcedure {
+ val NAME = "show_fsview_latest"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get(): ProcedureBuilder = new
ShowFileSystemViewProcedure(true)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
new file mode 100644
index 0000000000..69d08e37df
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestFsViewProcedure extends HoodieSparkSqlTestBase {
+ test("Test Call show_fsview_all Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | partitioned by (ts)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // Check required fields
+ checkExceptionContain(s"""call show_fsview_all(limit => 10)""")(
+ s"Argument: table is required")
+
+ // collect result for table
+ val result = spark.sql(
+ s"""call show_fsview_all(table => '$tableName', path_regex => '*/',
limit => 10)""".stripMargin).collect()
+ assertResult(2) {
+ result.length
+ }
+ }
+ }
+
+ test("Test Call show_fsview_latest Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | partitioned by (ts)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // Check required fields
+ checkExceptionContain(s"""call show_fsview_latest(limit => 10)""")(
+ s"Argument: table is required")
+
+ // collect result for table
+ val result = spark.sql(
+ s"""call show_fsview_latest(table => '$tableName', partition_path =>
'ts=1000', limit => 10)""".stripMargin).collect()
+ assertResult(1) {
+ result.length
+ }
+ }
+ }
+}