This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 37564b4fd68 [HUDI-7845] Call show_fsview_latest procedure support 
path_regex (#11418)
37564b4fd68 is described below

commit 37564b4fd68777fd0b1f553237066a07060aa1d6
Author: Zouxxyy <zouxinyu....@alibaba-inc.com>
AuthorDate: Sun Jun 9 09:11:46 2024 +0800

    [HUDI-7845] Call show_fsview_latest procedure support path_regex (#11418)
---
 .../table/view/AbstractTableFileSystemView.java    |  13 +++
 .../hudi/command/procedures/BaseProcedure.scala    |   5 +
 .../procedures/ShowFileSystemViewProcedure.scala   | 105 ++++++++++++---------
 .../sql/hudi/procedure/TestFsViewProcedure.scala   |  86 ++++++++++++++++-
 4 files changed, 164 insertions(+), 45 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 550082b0aa1..90f48b660c3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -672,6 +672,19 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     }
   }
 
+  public final List<String> getPartitionNames() {
+    try {
+      readLock.lock();
+      return fetchAllStoredFileGroups()
+          .filter(fg -> !isFileGroupReplaced(fg))
+          .map(HoodieFileGroup::getPartitionPath)
+          .distinct()
+          .collect(Collectors.toList());
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public final Stream<Pair<String, CompactionOperation>> 
getPendingLogCompactionOperations() {
     try {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
index b0ffc0cb64e..777d1937c98 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
@@ -76,6 +76,11 @@ abstract class BaseProcedure extends Procedure {
     }
   }
 
+  protected def isArgDefined(args: ProcedureArgs, parameter: 
ProcedureParameter): Boolean = {
+    val paramKey = getParamKey(parameter, args.isNamedArgs)
+    args.map.containsKey(paramKey)
+  }
+
   protected def getInternalRowValue(row: InternalRow, index: Int, dataType: 
DataType): Any = {
     dataType match {
       case StringType => row.getString(index)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
index c7d11f4c091..f19cd105c81 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
@@ -22,17 +22,23 @@ import org.apache.hudi.common.model.{FileSlice, 
HoodieLogFile}
 import org.apache.hudi.common.table.timeline.{CompletionTimeQueryView, 
HoodieDefaultTimeline, HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.storage.StoragePath
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
 
 import java.util.function.{Function, Supplier}
-import java.util.stream.Collectors
+import java.util.stream.{Collectors, Stream => JStream}
+import java.util.{ArrayList => JArrayList, List => JList}
 
 import scala.collection.JavaConverters._
 
 class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure 
with ProcedureBuilder {
+
+  private val ALL_PARTITIONS = "ALL_PARTITIONS"
+
   private val PARAMETERS_ALL: Array[ProcedureParameter] = 
Array[ProcedureParameter](
     ProcedureParameter.required(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
@@ -40,7 +46,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
     ProcedureParameter.optional(3, "include_in_flight", DataTypes.BooleanType, 
false),
     ProcedureParameter.optional(4, "exclude_compaction", 
DataTypes.BooleanType, false),
     ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*")
+    ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, 
ALL_PARTITIONS)
   )
 
   private val OUTPUT_TYPE_ALL: StructType = StructType(Array[StructField](
@@ -54,16 +60,11 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
     StructField("delta_files", DataTypes.StringType, nullable = true, 
Metadata.empty)
   ))
 
-  private val PARAMETERS_LATEST: Array[ProcedureParameter] = 
Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType),
-    ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
-    ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType, 
false),
-    ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType, 
false),
-    ProcedureParameter.optional(4, "exclude_compaction", 
DataTypes.BooleanType, false),
-    ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
-    ProcedureParameter.required(6, "partition_path", DataTypes.StringType),
-    ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true)
-
+  private val PARAMETERS_LATEST: Array[ProcedureParameter] =
+    PARAMETERS_ALL ++ Array[ProcedureParameter](
+      // Keep it for compatibility with older version, `path_regex` can 
replace it
+      ProcedureParameter.optional(7, "partition_path", DataTypes.StringType, 
ALL_PARTITIONS),
+      ProcedureParameter.optional(8, "merge", DataTypes.BooleanType, true)
   )
 
   private val OUTPUT_TYPE_LATEST: StructType = StructType(Array[StructField](
@@ -82,17 +83,16 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
     StructField("delta_files_compaction_unscheduled", DataTypes.StringType, 
nullable = true, Metadata.empty)
   ))
 
-  private def buildFileSystemView(table: Option[Any],
+  private def buildFileSystemView(basePath: String,
+                                  metaClient: HoodieTableMetaClient,
                                   globRegex: String,
                                   maxInstant: String,
                                   includeMaxInstant: Boolean,
                                   includeInflight: Boolean,
                                   excludeCompaction: Boolean
                                  ): HoodieTableFileSystemView = {
-    val basePath = getBasePath(table)
-    val metaClient = createMetaClient(jsc, basePath)
     val storage = metaClient.getStorage
-    val statuses = if (globRegex == PARAMETERS_ALL.apply(6).default) {
+    val statuses = if (globRegex == ALL_PARTITIONS) {
       FSUtils.getAllDataPathInfo(storage, new StoragePath(basePath))
     } else {
       val globPath = String.format("%s/%s/*", basePath, globRegex)
@@ -124,12 +124,12 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
     }
 
     val filteredTimeline = new HoodieDefaultTimeline(
-      new java.util.ArrayList[HoodieInstant](instants.toList.asJava).stream(), 
details)
+      new JArrayList[HoodieInstant](instants.toList.asJava).stream(), details)
     new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses)
   }
 
-  private def showAllFileSlices(fsView: HoodieTableFileSystemView): 
java.util.List[Row] = {
-    val rows: java.util.List[Row] = new java.util.ArrayList[Row]
+  private def showAllFileSlices(fsView: HoodieTableFileSystemView): JList[Row] 
= {
+    val rows: JList[Row] = new JArrayList[Row]
     fsView.getAllFileGroups.iterator().asScala.foreach(fg => {
       fg.getAllFileSlices.iterator().asScala.foreach(fs => {
         val fileId = fg.getFileGroupId.getFileId
@@ -150,25 +150,19 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
     rows
   }
 
-  private def showLatestFileSlices(fsView: HoodieTableFileSystemView,
-                                   table: Option[Any],
-                                   partition: String,
+  private def showLatestFileSlices(metaClient: HoodieTableMetaClient,
+                                   fsView: HoodieTableFileSystemView,
+                                   partitions: Seq[String],
                                    maxInstant: String,
-                                   merge: Boolean): java.util.List[Row] = {
-    var fileSliceStream: java.util.stream.Stream[FileSlice] = null
-    val basePath = getBasePath(table)
-    val metaClient = createMetaClient(jsc, basePath)
+                                   merge: Boolean): JList[Row] = {
+    var fileSliceStream: JStream[FileSlice] = JStream.empty()
     val completionTimeQueryView = new CompletionTimeQueryView(metaClient)
-    if (!merge) {
-      fileSliceStream = fsView.getLatestFileSlices(partition)
+    if (merge) {
+      partitions.foreach(p => fileSliceStream = 
JStream.concat(fileSliceStream, fsView.getLatestMergedFileSlicesBeforeOrOn(p, 
maxInstant)))
     } else {
-      fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, 
if (maxInstant.isEmpty) {
-        
metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp
-      } else {
-        maxInstant
-      })
+      partitions.foreach(p => fileSliceStream = 
JStream.concat(fileSliceStream, fsView.getLatestFileSlices(p)))
     }
-    val rows: java.util.List[Row] = new java.util.ArrayList[Row]
+    val rows = new JArrayList[Row]
     fileSliceStream.iterator().asScala.foreach {
       fs => {
         val fileId = fs.getFileId
@@ -204,7 +198,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
         val logFilesCommitTimeNonEqualInstantTime = 
fs.getLogFiles.iterator().asScala
           .filter(logFile => 
!logFile.getDeltaCommitTime.equals(fs.getBaseInstantTime))
           .mkString("[", ",", "]")
-        rows.add(Row(partition, fileId, baseInstantTime, baseFilePath, 
baseFileSize, numLogFiles, sumLogFileSize,
+        rows.add(Row(fs.getFileGroupId.getPartitionPath, fileId, 
baseInstantTime, baseFilePath, baseFileSize, numLogFiles, sumLogFileSize,
           logFilesScheduledForCompactionTotalSize, 
logFilesUnscheduledTotalSize, logSelectedForCompactionToBaseRatio,
           logUnscheduledToBaseRatio, logFilesCommitTimeEqualInstantTime, 
logFilesCommitTimeNonEqualInstantTime
         ))
@@ -234,15 +228,40 @@ class ShowFileSystemViewProcedure(showLatest: Boolean) 
extends BaseProcedure wit
     val includeInflight = getArgValueOrDefault(args, 
parameters(3)).get.asInstanceOf[Boolean]
     val excludeCompaction = getArgValueOrDefault(args, 
parameters(4)).get.asInstanceOf[Boolean]
     val limit = getArgValueOrDefault(args, parameters(5)).get.asInstanceOf[Int]
-    val rows: java.util.List[Row] = if (!showLatest) {
-      val globRegex = getArgValueOrDefault(args, 
parameters(6)).get.asInstanceOf[String]
-      val fsView = buildFileSystemView(table, globRegex, maxInstant, 
includeMax, includeInflight, excludeCompaction)
-      showAllFileSlices(fsView)
+    val globRegex = if (showLatest) {
+      val isPathRegexDefined = isArgDefined(args, parameters(6))
+      val isPartitionPathDefined = isArgDefined(args, parameters(7))
+      if (isPathRegexDefined && isPartitionPathDefined) {
+        throw new HoodieException("path_regex and partition_path cannot be 
used together")
+      }
+      if (isPathRegexDefined) {
+        getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String]
+      } else {
+        getArgValueOrDefault(args, parameters(7)).get.asInstanceOf[String]
+      }
+    } else {
+      getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String]
+    }
+    val basePath = getBasePath(table)
+    val metaClient = createMetaClient(jsc, basePath)
+    val fsView = buildFileSystemView(basePath, metaClient, globRegex, 
maxInstant, includeMax, includeInflight, excludeCompaction)
+    val rows = if (showLatest) {
+      val merge = getArgValueOrDefault(args, 
parameters(8)).get.asInstanceOf[Boolean]
+      val maxInstantForMerge = if (merge && maxInstant.isEmpty) {
+        val lastInstant = 
metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant()
+        if (lastInstant.isPresent) {
+          lastInstant.get().getTimestamp
+        } else {
+          // scalastyle:off return
+          return Seq.empty
+          // scalastyle:on return
+        }
+      } else {
+        maxInstant
+      }
+      showLatestFileSlices(metaClient, fsView, 
fsView.getPartitionNames.asScala.toSeq, maxInstantForMerge, merge)
     } else {
-      val partitionPath = getArgValueOrDefault(args, 
parameters(6)).get.asInstanceOf[String]
-      val merge = getArgValueOrDefault(args, 
parameters(7)).get.asInstanceOf[Boolean]
-      val fsView = buildFileSystemView(table, partitionPath, maxInstant, 
includeMax, includeInflight, excludeCompaction)
-      showLatestFileSlices(fsView, table, partitionPath, maxInstant, merge)
+      showAllFileSlices(fsView)
     }
     rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
index 9de1f1b0ee8..69b07f2c9cd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
@@ -118,7 +118,7 @@ class TestFsViewProcedure extends 
HoodieSparkProcedureTestBase {
            | )
        """.stripMargin)
       // insert data to table
-      spark.sql(s"insert into $tableName select 1, 'a1', 10, 'f11', 
'f21',1000")
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 'f11', 'f21', 
1000")
       spark.sql(s"insert into $tableName select 2, 'a2', 20, 'f12', 'f22', 
1500")
 
       // Check required fields
@@ -146,7 +146,6 @@ class TestFsViewProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
-
   test("Test Call show_fsview_latest Procedure") {
     withTempDir { tmp =>
       val tableName = generateTableName
@@ -183,4 +182,87 @@ class TestFsViewProcedure extends 
HoodieSparkProcedureTestBase {
       }
     }
   }
+
+  test("Test Call show_fsview_latest Procedure with NonPartition") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  'hoodie.parquet.small.file.limit' = '0'
+           | )
+       """.stripMargin)
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 2, 'a3', 20, 1500")
+
+      val result = spark.sql(
+        s"""call show_fsview_latest(table => '$tableName', limit => 
10)""".stripMargin).collect()
+      assertResult(2) {
+        result.length
+      }
+    }
+  }
+
+  test("Test Call show_fsview_latest Procedure with path_regex") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  ts long,
+           |  day string,
+           |  hh string
+           |) using hudi
+           | partitioned by(day, hh)
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  'hoodie.parquet.small.file.limit' = '0'
+           | )
+       """.stripMargin)
+
+      val result1 = spark.sql(s"call show_fsview_all(table => 
'$tableName')").collect()
+      assertResult(0) {
+        result1.length
+      }
+
+      spark.sql(s"insert into $tableName select 1, 'a1', 1001, 'd1', 'h1'")
+      spark.sql(s"insert into $tableName select 1, 'a2', 1002, 'd1', 'h1'")
+      spark.sql(s"insert into $tableName select 2, 'a3', 1003, 'd1', 'h2'")
+      spark.sql(s"insert into $tableName select 3, 'a4', 1004, 'd1', 'h2'")
+      spark.sql(s"insert into $tableName select 4, 'a5', 1005, 'd2', 'h1'")
+
+      val result2 = spark.sql(
+        s"call show_fsview_latest(table => '$tableName')").collect()
+      assertResult(4) {
+        result2.length
+      }
+
+      val result3 = spark.sql(
+        s"call show_fsview_latest(table => '$tableName', path_regex => 
'day=d1/*/')").collect()
+      assertResult(3) {
+        result3.length
+      }
+
+      val result4 = spark.sql(
+        s"call show_fsview_latest(table => '$tableName', path_regex => 
'day=d1/hh=h2/')").collect()
+      assertResult(2) {
+        result4.length
+      }
+    }
+  }
 }

Reply via email to