alexr17 commented on code in PR #13886:
URL: https://github.com/apache/hudi/pull/13886#discussion_r2393757057


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateAuditLockProcedure.scala:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import 
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for validating audit lock files for Hudi tables.
+ *
+ * This procedure validates the integrity and consistency of audit lock files
+ * generated by the storage lock audit service. It checks for various issues
+ * including file format, lock state transitions, and orphaned locks.
+ *
+ * Usage:
+ * {{{
+ * CALL validate_audit_lock(table => 'my_table')
+ * CALL validate_audit_lock(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads audit files from:
+ * `{table_path}/.hoodie/.locks/audit/`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ValidateAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("table", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("validation_result", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("transactions_validated", DataTypes.IntegerType, nullable = 
false, Metadata.empty),
+    StructField("issues_found", DataTypes.IntegerType, nullable = false, 
Metadata.empty),
+    StructField("details", DataTypes.StringType, nullable = false, 
Metadata.empty)
+  ))
+
+  private val OBJECT_MAPPER = new ObjectMapper()
+
+  /**
+   * Represents a transaction window with start time, end time, and metadata.
+   */
+  case class TransactionWindow(
+    ownerId: String,
+    transactionStartTime: Long,
+    startTimestamp: Long,
+    endTimestamp: Option[Long],
+    lastExpirationTime: Option[Long],
+    filename: String
+  ) {
+    def effectiveEndTime: Long = 
endTimestamp.orElse(lastExpirationTime).getOrElse(startTimestamp)
+  }
+
+  /**
+   * Returns the procedure parameters definition.
+   *
+   * @return Array of parameters: table (optional String) and path (optional 
String)
+   */
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  /**
+   * Returns the output schema for the procedure result.
+   *
+   * @return StructType containing table, validation_result, issues_found, and 
details columns
+   */
+  def outputType: StructType = OUTPUT_TYPE
+
+  /**
+   * Executes the audit lock validation procedure.
+   *
+   * @param args Procedure arguments containing table name or path
+   * @return Sequence containing a single Row with validation results
+   * @throws IllegalArgumentException if neither table nor path is provided, 
or both are provided
+   */
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+    // Get the base path using BaseProcedure helper (handles table/path 
validation)
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Use table name if provided, otherwise extract from path
+    val displayName = 
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+    try {
+      val auditFolderPath = new 
StoragePath(StorageLockProviderAuditService.getAuditFolderPath(basePath))
+      val storage = metaClient.getStorage
+
+      // Check if audit folder exists
+      if (!storage.exists(auditFolderPath)) {
+        Seq(Row(displayName, "PASSED", 0, 0, "No audit folder found - nothing 
to validate"))
+      } else {
+
+        // Get all audit files
+        val allFiles = storage.listDirectEntries(auditFolderPath).asScala
+        val auditFiles = allFiles.filter(pathInfo => pathInfo.isFile && 
pathInfo.getPath.getName.endsWith(".jsonl"))
+
+        if (auditFiles.isEmpty) {
+          Seq(Row(displayName, "PASSED", 0, 0, "No audit files found - nothing 
to validate"))
+        } else {
+
+          // Parse all audit files into transaction windows
+          val windows = auditFiles.flatMap(pathInfo => 
parseAuditFile(pathInfo, storage)).toSeq
+
+          if (windows.isEmpty) {
+            Seq(Row(displayName, "FAILED", 0, auditFiles.size, "Failed to 
parse any audit files"))
+          } else {
+
+            // Validate transactions
+            val validationResults = validateTransactionWindows(windows)
+
+            // Generate result
+            val (result, issuesFound, details) = 
formatValidationResults(validationResults)
+
+            Seq(Row(displayName, result, windows.size, issuesFound, details))

Review Comment:
   yes but that requires significant refactoring over the whole API. i feel we 
shouldn't be doing that in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to