This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 37564b4fd68 [HUDI-7845] Call show_fsview_latest procedure support path_regex (#11418) 37564b4fd68 is described below commit 37564b4fd68777fd0b1f553237066a07060aa1d6 Author: Zouxxyy <zouxinyu....@alibaba-inc.com> AuthorDate: Sun Jun 9 09:11:46 2024 +0800 [HUDI-7845] Call show_fsview_latest procedure support path_regex (#11418) --- .../table/view/AbstractTableFileSystemView.java | 13 +++ .../hudi/command/procedures/BaseProcedure.scala | 5 + .../procedures/ShowFileSystemViewProcedure.scala | 105 ++++++++++++--------- .../sql/hudi/procedure/TestFsViewProcedure.scala | 86 ++++++++++++++++- 4 files changed, 164 insertions(+), 45 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 550082b0aa1..90f48b660c3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -672,6 +672,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV } } + public final List<String> getPartitionNames() { + try { + readLock.lock(); + return fetchAllStoredFileGroups() + .filter(fg -> !isFileGroupReplaced(fg)) + .map(HoodieFileGroup::getPartitionPath) + .distinct() + .collect(Collectors.toList()); + } finally { + readLock.unlock(); + } + } + @Override public final Stream<Pair<String, CompactionOperation>> getPendingLogCompactionOperations() { try { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala index b0ffc0cb64e..777d1937c98 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala @@ -76,6 +76,11 @@ abstract class BaseProcedure extends Procedure { } } + protected def isArgDefined(args: ProcedureArgs, parameter: ProcedureParameter): Boolean = { + val paramKey = getParamKey(parameter, args.isNamedArgs) + args.map.containsKey(paramKey) + } + protected def getInternalRowValue(row: InternalRow, index: Int, dataType: DataType): Any = { dataType match { case StringType => row.getString(index) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala index c7d11f4c091..f19cd105c81 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala @@ -22,17 +22,23 @@ import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} import org.apache.hudi.common.table.timeline.{CompletionTimeQueryView, HoodieDefaultTimeline, HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.util +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.storage.StoragePath import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import java.util.function.{Function, Supplier} -import java.util.stream.Collectors +import java.util.stream.{Collectors, Stream => JStream} +import java.util.{ArrayList => JArrayList, List => JList} import scala.collection.JavaConverters._ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure with ProcedureBuilder { + + private val ALL_PARTITIONS = "ALL_PARTITIONS" + private val PARAMETERS_ALL: Array[ProcedureParameter] = Array[ProcedureParameter]( ProcedureParameter.required(0, "table", DataTypes.StringType), ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""), @@ -40,7 +46,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit ProcedureParameter.optional(3, "include_in_flight", DataTypes.BooleanType, false), ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false), ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10), - ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*") + ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, ALL_PARTITIONS) ) private val OUTPUT_TYPE_ALL: StructType = StructType(Array[StructField]( @@ -54,16 +60,11 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit StructField("delta_files", DataTypes.StringType, nullable = true, Metadata.empty) )) - private val PARAMETERS_LATEST: Array[ProcedureParameter] = Array[ProcedureParameter]( - ProcedureParameter.required(0, "table", DataTypes.StringType), - ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""), - ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, false), - ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, false), - ProcedureParameter.optional(4, "exclude_compaction", DataTypes.BooleanType, false), - ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10), - ProcedureParameter.required(6, "partition_path", DataTypes.StringType), - ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true) - + private val PARAMETERS_LATEST: Array[ProcedureParameter] = + PARAMETERS_ALL ++ Array[ProcedureParameter]( + // Keep it for compatibility with older version, `path_regex` can replace it + ProcedureParameter.optional(7, "partition_path", DataTypes.StringType, ALL_PARTITIONS), + ProcedureParameter.optional(8, "merge", DataTypes.BooleanType, true) ) private val OUTPUT_TYPE_LATEST: StructType = StructType(Array[StructField]( @@ -82,17 +83,16 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit StructField("delta_files_compaction_unscheduled", DataTypes.StringType, nullable = true, Metadata.empty) )) - private def buildFileSystemView(table: Option[Any], + private def buildFileSystemView(basePath: String, + metaClient: HoodieTableMetaClient, globRegex: String, maxInstant: String, includeMaxInstant: Boolean, includeInflight: Boolean, excludeCompaction: Boolean ): HoodieTableFileSystemView = { - val basePath = getBasePath(table) - val metaClient = createMetaClient(jsc, basePath) val storage = metaClient.getStorage - val statuses = if (globRegex == PARAMETERS_ALL.apply(6).default) { + val statuses = if (globRegex == ALL_PARTITIONS) { FSUtils.getAllDataPathInfo(storage, new StoragePath(basePath)) } else { val globPath = String.format("%s/%s/*", basePath, globRegex) @@ -124,12 +124,12 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit } val filteredTimeline = new HoodieDefaultTimeline( - new java.util.ArrayList[HoodieInstant](instants.toList.asJava).stream(), details) + new JArrayList[HoodieInstant](instants.toList.asJava).stream(), details) new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses) } - private def showAllFileSlices(fsView: HoodieTableFileSystemView): java.util.List[Row] = { - val rows: java.util.List[Row] = new java.util.ArrayList[Row] + private def showAllFileSlices(fsView: HoodieTableFileSystemView): JList[Row] = { + val rows: JList[Row] = new JArrayList[Row] fsView.getAllFileGroups.iterator().asScala.foreach(fg => { fg.getAllFileSlices.iterator().asScala.foreach(fs => { val fileId = fg.getFileGroupId.getFileId @@ -150,25 +150,19 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit rows } - private def showLatestFileSlices(fsView: HoodieTableFileSystemView, - table: Option[Any], - partition: String, + private def showLatestFileSlices(metaClient: HoodieTableMetaClient, + fsView: HoodieTableFileSystemView, + partitions: Seq[String], maxInstant: String, - merge: Boolean): java.util.List[Row] = { - var fileSliceStream: java.util.stream.Stream[FileSlice] = null - val basePath = getBasePath(table) - val metaClient = createMetaClient(jsc, basePath) + merge: Boolean): JList[Row] = { + var fileSliceStream: JStream[FileSlice] = JStream.empty() val completionTimeQueryView = new CompletionTimeQueryView(metaClient) - if (!merge) { - fileSliceStream = fsView.getLatestFileSlices(partition) + if (merge) { + partitions.foreach(p => fileSliceStream = JStream.concat(fileSliceStream, fsView.getLatestMergedFileSlicesBeforeOrOn(p, maxInstant))) } else { - fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, if (maxInstant.isEmpty) { - metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp - } else { - maxInstant - }) + partitions.foreach(p => fileSliceStream = JStream.concat(fileSliceStream, fsView.getLatestFileSlices(p))) } - val rows: java.util.List[Row] = new java.util.ArrayList[Row] + val rows = new JArrayList[Row] fileSliceStream.iterator().asScala.foreach { fs => { val fileId = fs.getFileId @@ -204,7 +198,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit val logFilesCommitTimeNonEqualInstantTime = fs.getLogFiles.iterator().asScala .filter(logFile => !logFile.getDeltaCommitTime.equals(fs.getBaseInstantTime)) .mkString("[", ",", "]") - rows.add(Row(partition, fileId, baseInstantTime, baseFilePath, baseFileSize, numLogFiles, sumLogFileSize, + rows.add(Row(fs.getFileGroupId.getPartitionPath, fileId, baseInstantTime, baseFilePath, baseFileSize, numLogFiles, sumLogFileSize, logFilesScheduledForCompactionTotalSize, logFilesUnscheduledTotalSize, logSelectedForCompactionToBaseRatio, logUnscheduledToBaseRatio, logFilesCommitTimeEqualInstantTime, logFilesCommitTimeNonEqualInstantTime )) @@ -234,15 +228,40 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure wit val includeInflight = getArgValueOrDefault(args, parameters(3)).get.asInstanceOf[Boolean] val excludeCompaction = getArgValueOrDefault(args, parameters(4)).get.asInstanceOf[Boolean] val limit = getArgValueOrDefault(args, parameters(5)).get.asInstanceOf[Int] - val rows: java.util.List[Row] = if (!showLatest) { - val globRegex = getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String] - val fsView = buildFileSystemView(table, globRegex, maxInstant, includeMax, includeInflight, excludeCompaction) - showAllFileSlices(fsView) + val globRegex = if (showLatest) { + val isPathRegexDefined = isArgDefined(args, parameters(6)) + val isPartitionPathDefined = isArgDefined(args, parameters(7)) + if (isPathRegexDefined && isPartitionPathDefined) { + throw new HoodieException("path_regex and partition_path cannot be used together") + } + if (isPathRegexDefined) { + getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String] + } else { + getArgValueOrDefault(args, parameters(7)).get.asInstanceOf[String] + } + } else { + getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String] + } + val basePath = getBasePath(table) + val metaClient = createMetaClient(jsc, basePath) + val fsView = buildFileSystemView(basePath, metaClient, globRegex, maxInstant, includeMax, includeInflight, excludeCompaction) + val rows = if (showLatest) { + val merge = getArgValueOrDefault(args, parameters(8)).get.asInstanceOf[Boolean] + val maxInstantForMerge = if (merge && maxInstant.isEmpty) { + val lastInstant = metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant() + if (lastInstant.isPresent) { + lastInstant.get().getTimestamp + } else { + // scalastyle:off return + return Seq.empty + // scalastyle:on return + } + } else { + maxInstant + } + showLatestFileSlices(metaClient, fsView, fsView.getPartitionNames.asScala.toSeq, maxInstantForMerge, merge) } else { - val partitionPath = getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String] - val merge = getArgValueOrDefault(args, parameters(7)).get.asInstanceOf[Boolean] - val fsView = buildFileSystemView(table, partitionPath, maxInstant, includeMax, includeInflight, excludeCompaction) - showLatestFileSlices(fsView, table, partitionPath, maxInstant, merge) + showAllFileSlices(fsView) } rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala index 9de1f1b0ee8..69b07f2c9cd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala @@ -118,7 +118,7 @@ class TestFsViewProcedure extends HoodieSparkProcedureTestBase { | ) """.stripMargin) // insert data to table - spark.sql(s"insert into $tableName select 1, 'a1', 10, 'f11', 'f21',1000") + spark.sql(s"insert into $tableName select 1, 'a1', 10, 'f11', 'f21', 1000") spark.sql(s"insert into $tableName select 2, 'a2', 20, 'f12', 'f22', 1500") // Check required fields @@ -146,7 +146,6 @@ class TestFsViewProcedure extends HoodieSparkProcedureTestBase { } } - test("Test Call show_fsview_latest Procedure") { withTempDir { tmp => val tableName = generateTableName @@ -183,4 +182,87 @@ class TestFsViewProcedure extends HoodieSparkProcedureTestBase { } } } + + test("Test Call show_fsview_latest Procedure with NonPartition") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | 'hoodie.parquet.small.file.limit' = '0' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 2, 'a3', 20, 1500") + + val result = spark.sql( + s"""call show_fsview_latest(table => '$tableName', limit => 10)""".stripMargin).collect() + assertResult(2) { + result.length + } + } + } + + test("Test Call show_fsview_latest Procedure with path_regex") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts long, + | day string, + | hh string + |) using hudi + | partitioned by(day, hh) + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | 'hoodie.parquet.small.file.limit' = '0' + | ) + """.stripMargin) + + val result1 = spark.sql(s"call show_fsview_all(table => '$tableName')").collect() + assertResult(0) { + result1.length + } + + spark.sql(s"insert into $tableName select 1, 'a1', 1001, 'd1', 'h1'") + spark.sql(s"insert into $tableName select 1, 'a2', 1002, 'd1', 'h1'") + spark.sql(s"insert into $tableName select 2, 'a3', 1003, 'd1', 'h2'") + spark.sql(s"insert into $tableName select 3, 'a4', 1004, 'd1', 'h2'") + spark.sql(s"insert into $tableName select 4, 'a5', 1005, 'd2', 'h1'") + + val result2 = spark.sql( + s"call show_fsview_latest(table => '$tableName')").collect() + assertResult(4) { + result2.length + } + + val result3 = spark.sql( + s"call show_fsview_latest(table => '$tableName', path_regex => 'day=d1/*/')").collect() + assertResult(3) { + result3.length + } + + val result4 = spark.sql( + s"call show_fsview_latest(table => '$tableName', path_regex => 'day=d1/hh=h2/')").collect() + assertResult(2) { + result4.length + } + } + } }