This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ca01dcf4ef3a [HUDI-9782] Add audit service implementation for storage
based lock provider (#13868)
ca01dcf4ef3a is described below
commit ca01dcf4ef3a342d360ac919ccc457d402b048c7
Author: Alex R <[email protected]>
AuthorDate: Thu Sep 11 20:44:06 2025 -0700
[HUDI-9782] Add audit service implementation for storage based lock
provider (#13868)
---
.../hudi/cli/commands/LockAuditingCommand.java | 188 ++++++++++
.../hudi/cli/commands/TestLockAuditingCommand.java | 285 ++++++++++++++++
.../client/transaction/lock/StorageLockClient.java | 2 +-
.../lock/audit/AuditServiceFactory.java | 26 +-
.../audit/StorageLockProviderAuditService.java | 179 ++++++++++
.../lock/TestStorageBasedLockProvider.java | 58 ++++
.../lock/audit/TestAuditServiceFactory.java | 5 +-
.../audit/TestStorageLockProviderAuditService.java | 379 +++++++++++++++++++++
.../hudi/command/procedures/HoodieProcedures.scala | 2 +
.../command/procedures/SetAuditLockProcedure.scala | 187 ++++++++++
.../procedures/ShowAuditLockStatusProcedure.scala | 168 +++++++++
.../hudi/procedure/TestSetAuditLockProcedure.scala | 169 +++++++++
.../TestShowAuditLockStatusProcedure.scala | 223 ++++++++++++
13 files changed, 1856 insertions(+), 15 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
new file mode 100644
index 000000000000..51e2dbe7fe6c
--- /dev/null
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/LockAuditingCommand.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.shell.standard.ShellComponent;
+import org.springframework.shell.standard.ShellMethod;
+import org.springframework.shell.standard.ShellOption;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * CLI commands for managing Hudi table lock auditing functionality.
+ */
+@ShellComponent
+public class LockAuditingCommand {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LockAuditingCommand.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * Enables lock audit logging for the currently connected Hudi table.
+ * This command creates or updates the audit configuration file to enable
+ * audit logging for storage lock operations.
+ *
+ * @return Status message indicating success or failure
+ */
+ @ShellMethod(key = "locks audit enable", value = "Enable storage lock audit
service for the current table")
+ public String enableLockAudit() {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ // Create the audit config file path using utility method
+ String auditConfigPath =
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+
+ // Create the JSON content
+ ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
+
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
true);
+ String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
+
+ // Write the config file using HoodieStorage
+ StoragePath configPath = new StoragePath(auditConfigPath);
+ try (OutputStream outputStream = HoodieCLI.storage.create(configPath,
true)) {
+ outputStream.write(jsonContent.getBytes());
+ }
+
+ return String.format("Lock audit enabled successfully.\nAudit config
written to: %s\n"
+ + "Audit files will be stored at: %s", auditConfigPath,
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+
+ } catch (Exception e) {
+ LOG.error("Error enabling lock audit", e);
+ return String.format("Failed to enable lock audit: %s", e.getMessage());
+ }
+ }
+
+ /**
+ * Disables lock audit logging for the currently connected Hudi table.
+ * This command updates the audit configuration file to disable audit
logging.
+ *
+ * @param keepAuditFiles Whether to preserve existing audit files when
disabling
+ * @return Status message indicating success or failure
+ */
+ @ShellMethod(key = "locks audit disable", value = "Disable storage lock
audit service for the current table")
+ public String disableLockAudit(
+ @ShellOption(value = {"--keepAuditFiles"}, defaultValue = "true",
+ help = "Keep existing audit files when disabling") final boolean
keepAuditFiles) {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ // Create the audit config file path
+ String auditConfigPath =
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+
+ // Check if config file exists
+ StoragePath configPath = new StoragePath(auditConfigPath);
+ if (!HoodieCLI.storage.exists(configPath)) {
+ return "Lock audit is already disabled (no configuration file found).";
+ }
+
+ // Create the JSON content with audit disabled
+ ObjectNode configJson = OBJECT_MAPPER.createObjectNode();
+
configJson.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
false);
+ String jsonContent = OBJECT_MAPPER.writeValueAsString(configJson);
+
+ // Write the config file
+ try (OutputStream outputStream = HoodieCLI.storage.create(configPath,
true)) {
+ outputStream.write(jsonContent.getBytes());
+ }
+
+ String message = String.format("Lock audit disabled successfully.\nAudit
config updated at: %s", auditConfigPath);
+
+ if (keepAuditFiles) {
+ message += String.format("\nExisting audit files preserved at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ } else {
+ // Todo: write then call the api method to prune the old files
+ message += String.format("\nAudit files cleaned up at: %s",
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+ }
+
+ return message;
+
+ } catch (Exception e) {
+ LOG.error("Error disabling lock audit", e);
+ return String.format("Failed to disable lock audit: %s", e.getMessage());
+ }
+ }
+
+ /**
+ * Shows the current status of lock audit logging for the connected table.
+ * This command checks the audit configuration file and reports whether
+ * auditing is currently enabled or disabled.
+ *
+ * @return Status information about the current audit configuration
+ */
+ @ShellMethod(key = "locks audit status", value = "Show the current status of
lock audit service")
+ public String showLockAuditStatus() {
+
+ if (HoodieCLI.basePath == null) {
+ return "No Hudi table loaded. Please connect to a table first.";
+ }
+
+ try {
+ // Create the audit config file path
+ String auditConfigPath =
StorageLockProviderAuditService.getAuditConfigPath(HoodieCLI.basePath);
+
+ // Check if config file exists
+ StoragePath configPath = new StoragePath(auditConfigPath);
+ if (!HoodieCLI.storage.exists(configPath)) {
+ return String.format("Lock Audit Status: DISABLED\n"
+ + "Table: %s\n"
+ + "Config file: %s (not found)\n"
+ + "Use 'locks audit enable' to enable audit logging.",
+ HoodieCLI.basePath, auditConfigPath);
+ }
+
+ // Read and parse the configuration
+ String configContent;
+ try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+ configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+ }
+ JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+ JsonNode enabledNode =
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+ boolean isEnabled = enabledNode != null && enabledNode.asBoolean(false);
+
+ String status = isEnabled ? "ENABLED" : "DISABLED";
+
+ return String.format("Lock Audit Status: %s\n"
+ + "Table: %s\n"
+ + "Config file: %s\n"
+ + "Audit files location: %s",
+ status, HoodieCLI.basePath, auditConfigPath,
StorageLockProviderAuditService.getAuditFolderPath(HoodieCLI.basePath));
+
+ } catch (Exception e) {
+ LOG.error("Error checking lock audit status", e);
+ return String.format("Failed to check lock audit status: %s",
e.getMessage());
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
new file mode 100644
index 000000000000..da346110e124
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestLockAuditingCommand.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
+import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.InputStream;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.shell.Shell;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link org.apache.hudi.cli.commands.LockAuditingCommand}.
+ */
+@Tag("functional")
+@SpringBootTest(properties = {"spring.shell.interactive.enabled=false",
"spring.shell.command.script.enabled=false"})
+public class TestLockAuditingCommand extends CLIFunctionalTestHarness {
+
+ @Autowired
+ private Shell shell;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ HoodieCLI.conf = storageConf();
+ String tableName = tableName();
+ String tablePath = tablePath(tableName);
+ HoodieCLI.basePath = tablePath;
+
+ // Initialize table
+ HoodieTableMetaClient.newTableBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+ .setTableName(tableName)
+ .setRecordKeyFields("key")
+ .initTable(storageConf(), tablePath);
+
+ // Initialize storage
+ HoodieCLI.initFS(true);
+ }
+
+ /**
+ * Test enabling lock audit when no table is loaded.
+ */
+ @Test
+ public void testEnableLockAuditNoTable() {
+ // Clear the base path to simulate no table loaded
+ String originalBasePath = HoodieCLI.basePath;
+ HoodieCLI.basePath = null;
+
+ try {
+ Object result = shell.evaluate(() -> "locks audit enable");
+ assertAll("Command runs with no table loaded",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertEquals("No Hudi table loaded. Please connect to a table
first.", result.toString()));
+ } finally {
+ HoodieCLI.basePath = originalBasePath;
+ }
+ }
+
+ /**
+ * Test disabling lock audit when no table is loaded.
+ */
+ @Test
+ public void testDisableLockAuditNoTable() {
+ // Clear the base path to simulate no table loaded
+ String originalBasePath = HoodieCLI.basePath;
+ HoodieCLI.basePath = null;
+
+ try {
+ Object result = shell.evaluate(() -> "locks audit disable");
+ assertAll("Command runs with no table loaded",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertEquals("No Hudi table loaded. Please connect to a table
first.", result.toString()));
+ } finally {
+ HoodieCLI.basePath = originalBasePath;
+ }
+ }
+
+ /**
+ * Test showing lock audit status when no table is loaded.
+ */
+ @Test
+ public void testShowLockAuditStatusNoTable() {
+ // Clear the base path to simulate no table loaded
+ String originalBasePath = HoodieCLI.basePath;
+ HoodieCLI.basePath = null;
+
+ try {
+ Object result = shell.evaluate(() -> "locks audit status");
+ assertAll("Command runs with no table loaded",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertEquals("No Hudi table loaded. Please connect to a table
first.", result.toString()));
+ } finally {
+ HoodieCLI.basePath = originalBasePath;
+ }
+ }
+
+ /**
+ * Test enabling lock audit successfully.
+ */
+ @Test
+ public void testEnableLockAuditSuccess() throws Exception {
+ Object result = shell.evaluate(() -> "locks audit enable");
+
+ assertAll("Enable command runs successfully",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Lock audit enabled
successfully")));
+
+ // Verify the config file was created with correct content
+ String lockFolderPath = String.format("%s%s.hoodie%s.locks",
HoodieCLI.basePath, StoragePath.SEPARATOR, StoragePath.SEPARATOR);
+ String auditConfigPath = String.format("%s%s%s", lockFolderPath,
StoragePath.SEPARATOR, StorageLockProviderAuditService.AUDIT_CONFIG_FILE_NAME);
+ StoragePath configPath = new StoragePath(auditConfigPath);
+
+ assertTrue(HoodieCLI.storage.exists(configPath), "Config file should
exist");
+
+ String configContent;
+ try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+ configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+ }
+ JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+ JsonNode enabledNode =
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+
+ assertNotNull(enabledNode, "Config should contain enabled field");
+ assertTrue(enabledNode.asBoolean(), "Audit should be enabled");
+ }
+
+ /**
+ * Test showing lock audit status when disabled (no config file).
+ */
+ @Test
+ public void testShowLockAuditStatusDisabled() {
+ Object result = shell.evaluate(() -> "locks audit status");
+
+ assertAll("Status command shows disabled state",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Lock Audit Status:
DISABLED")),
+ () -> assertTrue(result.toString().contains("(not found)")));
+ }
+
+ /**
+ * Test the complete workflow: enable, check status, disable, check status.
+ */
+ @Test
+ public void testCompleteAuditWorkflow() throws Exception {
+ // 1. Enable audit
+ Object enableResult = shell.evaluate(() -> "locks audit enable");
+ assertTrue(ShellEvaluationResultUtil.isSuccess(enableResult));
+ assertTrue(enableResult.toString().contains("Lock audit enabled
successfully"));
+
+ // 2. Check status - should be enabled
+ Object statusEnabledResult = shell.evaluate(() -> "locks audit status");
+ assertTrue(ShellEvaluationResultUtil.isSuccess(statusEnabledResult));
+ assertTrue(statusEnabledResult.toString().contains("Lock Audit Status:
ENABLED"));
+
+ // 3. Disable audit with default keepAuditFiles=true
+ Object disableResult = shell.evaluate(() -> "locks audit disable");
+ assertTrue(ShellEvaluationResultUtil.isSuccess(disableResult));
+ assertTrue(disableResult.toString().contains("Lock audit disabled
successfully"));
+ assertTrue(disableResult.toString().contains("Existing audit files
preserved"));
+
+ // 4. Check status - should be disabled
+ Object statusDisabledResult = shell.evaluate(() -> "locks audit status");
+ assertTrue(ShellEvaluationResultUtil.isSuccess(statusDisabledResult));
+ assertTrue(statusDisabledResult.toString().contains("Lock Audit Status:
DISABLED"));
+
+ // Verify the config file still exists but with audit disabled
+ String lockFolderPath = String.format("%s%s.hoodie%s.locks",
HoodieCLI.basePath, StoragePath.SEPARATOR, StoragePath.SEPARATOR);
+ String auditConfigPath = String.format("%s%s%s", lockFolderPath,
StoragePath.SEPARATOR, StorageLockProviderAuditService.AUDIT_CONFIG_FILE_NAME);
+ StoragePath configPath = new StoragePath(auditConfigPath);
+
+ assertTrue(HoodieCLI.storage.exists(configPath), "Config file should still
exist");
+
+ String configContent;
+ try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+ configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+ }
+ JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+ JsonNode enabledNode =
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+
+ assertNotNull(enabledNode, "Config should contain enabled field");
+ assertFalse(enabledNode.asBoolean(), "Audit should be disabled");
+ }
+
+ /**
+ * Test disabling audit when no config file exists.
+ */
+ @Test
+ public void testDisableLockAuditNoConfigFile() {
+ Object result = shell.evaluate(() -> "locks audit disable");
+
+ assertAll("Disable command handles missing config file",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertEquals("Lock audit is already disabled (no configuration
file found).", result.toString()));
+ }
+
+ /**
+ * Test disabling audit with keepAuditFiles=false option.
+ */
+ @Test
+ public void testDisableLockAuditWithoutKeepingFiles() throws Exception {
+ // First enable audit
+ shell.evaluate(() -> "locks audit enable");
+
+ // Disable with keepAuditFiles=false
+ Object result = shell.evaluate(() -> "locks audit disable --keepAuditFiles
false");
+
+ assertAll("Disable command with keepAuditFiles=false",
+ () -> assertTrue(ShellEvaluationResultUtil.isSuccess(result)),
+ () -> assertNotNull(result.toString()),
+ () -> assertTrue(result.toString().contains("Lock audit disabled
successfully")),
+ () -> assertTrue(result.toString().contains("Audit files cleaned up
at:")));
+ }
+
+ /**
+ * Test enabling audit multiple times (should overwrite).
+ */
+ @Test
+ public void testEnableLockAuditMultipleTimes() throws Exception {
+ // Enable first time
+ Object result1 = shell.evaluate(() -> "locks audit enable");
+ assertTrue(ShellEvaluationResultUtil.isSuccess(result1));
+ assertTrue(result1.toString().contains("Lock audit enabled successfully"));
+
+ // Enable second time (should succeed and overwrite)
+ Object result2 = shell.evaluate(() -> "locks audit enable");
+ assertTrue(ShellEvaluationResultUtil.isSuccess(result2));
+ assertTrue(result2.toString().contains("Lock audit enabled successfully"));
+
+ // Verify config is still correct
+ String lockFolderPath = String.format("%s%s.hoodie%s.locks",
HoodieCLI.basePath, StoragePath.SEPARATOR, StoragePath.SEPARATOR);
+ String auditConfigPath = String.format("%s%s%s", lockFolderPath,
StoragePath.SEPARATOR, StorageLockProviderAuditService.AUDIT_CONFIG_FILE_NAME);
+ StoragePath configPath = new StoragePath(auditConfigPath);
+
+ String configContent;
+ try (InputStream inputStream = HoodieCLI.storage.open(configPath)) {
+ configContent = new String(FileIOUtils.readAsByteArray(inputStream));
+ }
+ JsonNode rootNode = OBJECT_MAPPER.readTree(configContent);
+ JsonNode enabledNode =
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+
+ assertTrue(enabledNode.asBoolean(), "Audit should still be enabled");
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
index 865013f3acaa..fa5c55171983 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java
@@ -86,7 +86,7 @@ public interface StorageLockClient extends AutoCloseable {
* This is a static utility method that can be used without creating an
instance.
*
* @param basePath The base path of the Hudi table
- * @return The lock folder path (e.g., "s3://bucket/table/.hoodie/locks")
+ * @return The lock folder path (e.g., "s3://bucket/table/.hoodie/.locks")
*/
static String getLockFolderPath(String basePath) {
return String.format("%s%s%s", basePath, StoragePath.SEPARATOR,
LOCKS_FOLDER_NAME);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
index eb9accaf4fdc..871330886916 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/AuditServiceFactory.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.transaction.lock.audit;
import org.apache.hudi.client.transaction.lock.StorageLockClient;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.StoragePath;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,12 +36,12 @@ import java.util.function.Supplier;
public class AuditServiceFactory {
private static final Logger LOG =
LoggerFactory.getLogger(AuditServiceFactory.class);
- private static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json";
- private static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD =
"STORAGE_LOCK_AUDIT_SERVICE_ENABLED";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Creates a lock provider audit service instance by checking the audit
configuration file.
+ * This method reads the audit configuration to determine if auditing is
enabled.
+ * If auditing is enabled, it returns a StorageLockProviderAuditService
instance.
*
* @param ownerId The owner ID for the lock provider
* @param basePath The base path of the Hudi table
@@ -63,13 +62,21 @@ public class AuditServiceFactory {
if (!isAuditEnabled(basePath, storageLockClient)) {
return Option.empty();
}
-
- // No-op, will add in a follow up
- return Option.empty();
+
+ return Option.of(new StorageLockProviderAuditService(
+ basePath,
+ ownerId,
+ transactionStartTime,
+ storageLockClient,
+ lockExpirationFunction,
+ lockHeldSupplier));
}
/**
* Checks if audit is enabled by reading the audit configuration file.
+ * This method looks for the audit configuration file at the expected
location
+ * and parses it to determine if auditing is enabled. Returns false if the
+ * configuration file doesn't exist or cannot be read.
*
* @param basePath The base path of the Hudi table
* @param storageLockClient The storage lock client to use for reading
configuration
@@ -77,9 +84,8 @@ public class AuditServiceFactory {
*/
private static boolean isAuditEnabled(String basePath, StorageLockClient
storageLockClient) {
try {
- // Construct the audit config path using the same lock folder as the
lock file
- String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
- String auditConfigPath = String.format("%s%s%s", lockFolderPath,
StoragePath.SEPARATOR, AUDIT_CONFIG_FILE_NAME);
+ // Construct the audit config path using the utility method
+ String auditConfigPath =
StorageLockProviderAuditService.getAuditConfigPath(basePath);
LOG.debug("Checking for audit configuration at: {}", auditConfigPath);
@@ -90,7 +96,7 @@ public class AuditServiceFactory {
if (jsonContent.isPresent()) {
LOG.debug("Audit configuration file found, parsing content");
JsonNode rootNode = OBJECT_MAPPER.readTree(jsonContent.get());
- JsonNode enabledNode =
rootNode.get(STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
+ JsonNode enabledNode =
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD);
boolean isEnabled = enabledNode != null &&
enabledNode.asBoolean(false);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
new file mode 100644
index 000000000000..ba353fd8eacb
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/audit/StorageLockProviderAuditService.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Storage-based audit service implementation for lock provider operations.
+ * Writes audit records to a single JSONL file per transaction to track lock
lifecycle events.
+ */
+public class StorageLockProviderAuditService implements AuditService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StorageLockProviderAuditService.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ // Audit configuration constants
+ public static final String AUDIT_FOLDER_NAME = "audit";
+ public static final String AUDIT_CONFIG_FILE_NAME = "audit_enabled.json";
+ public static final String STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD =
"STORAGE_LOCK_AUDIT_SERVICE_ENABLED";
+
+ /**
+ * Constructs the full path to the audit configuration file for a given
table.
+ *
+ * @param basePath The base path of the Hudi table
+ * @return The full path to the audit_enabled.json configuration file
+ */
+ public static String getAuditConfigPath(String basePath) {
+ String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+ return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR,
AUDIT_CONFIG_FILE_NAME);
+ }
+
+ /**
+ * Constructs the full path to the audit folder for a given table.
+ *
+ * @param basePath The base path of the Hudi table
+ * @return The full path to the audit folder where audit files are stored
+ */
+ public static String getAuditFolderPath(String basePath) {
+ String lockFolderPath = StorageLockClient.getLockFolderPath(basePath);
+ return String.format("%s%s%s", lockFolderPath, StoragePath.SEPARATOR,
AUDIT_FOLDER_NAME);
+ }
+
+ private final String ownerId;
+ private final long transactionStartTime;
+ private final String auditFilePath;
+ private final StorageLockClient storageLockClient;
+ private final Function<Long, Long> lockExpirationFunction;
+ private final Supplier<Boolean> lockHeldSupplier;
+ private final StringBuilder auditBuffer;
+
+ /**
+ * Creates a new StorageLockProviderAuditService instance.
+ *
+ * @param basePath The base path where audit files will be written
+ * @param ownerId The full owner ID for the lock
+ * @param transactionStartTime The timestamp when the transaction started
(lock acquired)
+ * @param storageLockClient The storage client for writing audit files
+ * @param lockExpirationFunction Function that takes a timestamp and returns
the lock expiration time
+ * @param lockHeldSupplier Supplier that provides whether the lock is
currently held
+ */
+ public StorageLockProviderAuditService(
+ String basePath,
+ String ownerId,
+ long transactionStartTime,
+ StorageLockClient storageLockClient,
+ Function<Long, Long> lockExpirationFunction,
+ Supplier<Boolean> lockHeldSupplier) {
+ this.ownerId = ownerId;
+ this.transactionStartTime = transactionStartTime;
+ this.storageLockClient = storageLockClient;
+ this.lockExpirationFunction = lockExpirationFunction;
+ this.lockHeldSupplier = lockHeldSupplier;
+ this.auditBuffer = new StringBuilder();
+
+ // Generate audit file path: <txn-start>_<full-owner-id>.jsonl
+ String filename = String.format("%d_%s.jsonl", transactionStartTime,
ownerId);
+ this.auditFilePath = String.format("%s%s%s",
+ getAuditFolderPath(basePath),
+ StoragePath.SEPARATOR,
+ filename);
+
+ LOG.debug("Initialized audit service for transaction starting at {} with
file: {}",
+ transactionStartTime, auditFilePath);
+ }
+
+ /**
+ * Records an audit operation with the current timestamp and state.
+ *
+ * @param state The audit operation state to record
+ * @param timestamp The timestamp when the operation occurred
+ * @throws Exception if there's an error creating or writing the audit record
+ */
+ @Override
+ public synchronized void recordOperation(AuditOperationState state, long
timestamp) throws Exception {
+ // Create audit record
+ Map<String, Object> auditRecord = new HashMap<>();
+ auditRecord.put("ownerId", ownerId);
+ auditRecord.put("transactionStartTime", transactionStartTime);
+ auditRecord.put("timestamp", timestamp);
+ auditRecord.put("state", state.name());
+ auditRecord.put("lockExpiration", lockExpirationFunction.apply(timestamp));
+ auditRecord.put("lockHeld", lockHeldSupplier.get());
+
+ // Convert to JSON and append newline for JSONL format
+ String jsonLine = OBJECT_MAPPER.writeValueAsString(auditRecord) + "\n";
+
+ // Append to buffer
+ auditBuffer.append(jsonLine);
+
+ // Write the accumulated audit records to file
+ writeAuditFile();
+
+ LOG.debug("Recorded audit operation: state={}, timestamp={}, file={}",
+ state, timestamp, auditFilePath);
+ }
+
+ /**
+ * Writes the accumulated audit records to the audit file.
+ * This method is called after each recordOperation to persist the audit log.
+ * Failures to write audit records are logged but do not throw exceptions
+ * to avoid breaking lock operations.
+ */
+ private void writeAuditFile() {
+ try {
+ // Write the entire buffer content to the audit file
+ // This overwrites the file with all accumulated records
+ String content = auditBuffer.toString();
+ boolean success = storageLockClient.writeObject(auditFilePath, content);
+ if (success) {
+ LOG.debug("Successfully wrote audit records to: {}", auditFilePath);
+ } else {
+ LOG.warn("Failed to write audit records to: {}", auditFilePath);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to write audit records to: {}", auditFilePath, e);
+ // Don't throw exception - audit failures should not break lock
operations
+ }
+ }
+
+ /**
+ * Closes the audit service. Since all audit records are written after each
operation,
+ * no additional cleanup is required during close.
+ *
+ * @throws Exception if there's an error during cleanup (not expected in
current implementation)
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ // All audit records are already written after each recordOperation()
+ // No additional writes needed during close
+ LOG.debug("Closed StorageLockProviderAuditService for transaction: {},
owner: {}",
+ transactionStartTime, ownerId);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
index 4ae637cbdc18..6675cc14ecf4 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java
@@ -842,6 +842,64 @@ class TestStorageBasedLockProvider {
auditLockProvider.close();
}
+ @Test
+ void testAuditServiceIntegrationWhenConfigEnabled() {
+ // Test that lock provider works correctly when audit is enabled
+ TypedProperties props = new TypedProperties();
+ props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10");
+ props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1");
+ props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-audit-enabled");
+
+ // Mock client that returns enabled config
+ StorageLockClient auditMockClient = mock(StorageLockClient.class);
+ String enabledConfig = "{\"STORAGE_LOCK_AUDIT_SERVICE_ENABLED\": true}";
+ when(auditMockClient.readObject(anyString(), eq(true)))
+ .thenReturn(Option.of(enabledConfig));
+ when(auditMockClient.readCurrentLockFile())
+ .thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty()));
+ // Mock writeObject method to return true for audit file writes
+ when(auditMockClient.writeObject(anyString(), anyString()))
+ .thenReturn(true);
+
+ StorageBasedLockProvider auditLockProvider = new StorageBasedLockProvider(
+ ownerId,
+ props,
+ (a, b, c) -> mockHeartbeatManager,
+ (a, b, c) -> auditMockClient,
+ mockLogger,
+ null);
+
+ // Set up lock acquisition
+ StorageLockData data = new StorageLockData(false,
System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId);
+ StorageLockFile lockFile = new StorageLockFile(data, "v1");
+ when(auditMockClient.tryUpsertLockFile(any(), eq(Option.empty())))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+ when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true);
+
+ // tryLock should trigger audit service creation and START audit
+ assertTrue(auditLockProvider.tryLock());
+
+ // Verify audit config was checked during tryLock
+ verify(auditMockClient, times(1)).readObject(
+ contains(".locks/audit_enabled.json"), eq(true));
+
+ // Verify audit START operation was written
+ verify(auditMockClient, times(1)).writeObject(
+ contains(".locks/audit/"), anyString());
+
+ // Unlock should trigger END audit
+ when(auditMockClient.tryUpsertLockFile(any(), any()))
+ .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFile)));
+ when(mockHeartbeatManager.stopHeartbeat(anyBoolean())).thenReturn(true);
+ auditLockProvider.unlock();
+
+ // Verify audit END operation was written (total 2 writes: START and END)
+ verify(auditMockClient, times(2)).writeObject(
+ contains(".locks/audit/"), anyString());
+
+ auditLockProvider.close();
+ }
+
public static class StubStorageLockClient implements StorageLockClient {
public StubStorageLockClient(String ownerId, String lockFileUri,
Properties props) {
assertTrue(lockFileUri.endsWith("table_lock.json"));
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
index e944fdc89615..a53c829e8d75 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestAuditServiceFactory.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -94,15 +93,13 @@ public class TestAuditServiceFactory {
// Mock writeObject method to return true for audit file writes
when(mockStorageLockClient.writeObject(anyString(), anyString()))
.thenReturn(true);
-
Option<AuditService> result =
AuditServiceFactory.createLockProviderAuditService(
ownerId, basePath, mockStorageLockClient,
System.currentTimeMillis(),
timestamp -> timestamp + 10000, // lockExpirationFunction
() -> true); // lockHeldSupplier
- // Should return empty audit service as the service is not added yet.
- assertFalse(result.isPresent());
+ assertTrue(result.isPresent());
verify(mockStorageLockClient).readObject(expectedPath, true);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java
new file mode 100644
index 000000000000..d5946a6ae410
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/audit/TestStorageLockProviderAuditService.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock.audit;
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient;
+import org.apache.hudi.storage.StoragePath;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for StorageLockProviderAuditService.
+ */
+public class TestStorageLockProviderAuditService {
+
+ private static final String BASE_PATH = "s3://bucket/table";
+ private static final String OWNER_ID =
"writer-12345678-9abc-def0-1234-567890abcdef";
+ private static final long TRANSACTION_START_TIME = 1234567890000L;
+ private static final long LOCK_EXPIRATION = 1000000L;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private StorageLockClient storageLockClient;
+ private Supplier<Boolean> lockHeldSupplier;
+ private StorageLockProviderAuditService auditService;
+
+ @BeforeEach
+ void setUp() {
+ storageLockClient = mock(StorageLockClient.class);
+ lockHeldSupplier = mock(Supplier.class);
+
+ when(lockHeldSupplier.get()).thenReturn(true);
+
+ auditService = new StorageLockProviderAuditService(
+ BASE_PATH,
+ OWNER_ID,
+ TRANSACTION_START_TIME,
+ storageLockClient,
+ timestamp -> LOCK_EXPIRATION,
+ lockHeldSupplier);
+ }
+
+ /**
+ * Helper method to validate audit record structure and content.
+ *
+ * @param auditRecord The audit record to validate
+ * @param expectedState Expected operation state
+ * @param expectedTimestamp Expected timestamp
+ * @param expectedLockHeld Expected lock held status
+ */
+ private void validateAuditRecord(Map<String, Object> auditRecord,
AuditOperationState expectedState,
+ long expectedTimestamp, boolean
expectedLockHeld) {
+ assertNotNull(auditRecord, "Audit record should not be null");
+ assertEquals(6, auditRecord.size(), "Audit record should contain exactly 6
fields");
+
+ // Validate all required fields are present
+ assertNotNull(auditRecord.get("ownerId"), "ownerId should be present");
+ assertNotNull(auditRecord.get("transactionStartTime"),
"transactionStartTime should be present");
+ assertNotNull(auditRecord.get("timestamp"), "timestamp should be present");
+ assertNotNull(auditRecord.get("state"), "state should be present");
+ assertNotNull(auditRecord.get("lockExpiration"), "lockExpiration should be
present");
+ assertNotNull(auditRecord.get("lockHeld"), "lockHeld should be present");
+
+ // Validate field values
+ assertEquals(OWNER_ID, auditRecord.get("ownerId"));
+ assertEquals(TRANSACTION_START_TIME, ((Number)
auditRecord.get("transactionStartTime")).longValue());
+ assertEquals(expectedTimestamp, ((Number)
auditRecord.get("timestamp")).longValue());
+ assertEquals(expectedState.name(), auditRecord.get("state"));
+ assertEquals(LOCK_EXPIRATION, ((Number)
auditRecord.get("lockExpiration")).longValue());
+ assertEquals(expectedLockHeld, auditRecord.get("lockHeld"));
+ }
+
+ /**
+ * Helper method to verify expected file path generation.
+ *
+ * @param actualPath The actual file path captured
+ * @param ownerId The owner ID used
+ * @param transactionStartTime The transaction start time used
+ */
+ private void validateFilePath(String actualPath, String ownerId, long
transactionStartTime) {
+ String expectedPath =
String.format("%s%s.hoodie%s.locks%saudit%s%d_%s.jsonl",
+ BASE_PATH,
+ StoragePath.SEPARATOR,
+ StoragePath.SEPARATOR,
+ StoragePath.SEPARATOR,
+ StoragePath.SEPARATOR,
+ transactionStartTime,
+ ownerId);
+ assertEquals(expectedPath, actualPath);
+ }
+
+ /**
+ * Comprehensive test for complete audit lifecycle: START -> RENEW -> END.
+ * This consolidates individual operation tests into a single comprehensive
test
+ * that validates the entire audit flow.
+ */
+ @Test
+ void testCompleteAuditLifecycle() throws Exception {
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenReturn(true);
+
+ long startTime = System.currentTimeMillis();
+ long renewTime = startTime + 1000;
+ long endTime = startTime + 2000;
+
+ // Configure lock held status for each operation
+ when(lockHeldSupplier.get()).thenReturn(true, true, false);
+
+ // Execute complete lifecycle
+ auditService.recordOperation(AuditOperationState.START, startTime);
+ auditService.recordOperation(AuditOperationState.RENEW, renewTime);
+ auditService.recordOperation(AuditOperationState.END, endTime);
+
+ // Capture all write calls
+ ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> contentCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(storageLockClient, times(3)).writeObject(pathCaptor.capture(),
contentCaptor.capture());
+
+ // Validate file path (should be consistent across all operations)
+ validateFilePath(pathCaptor.getValue(), OWNER_ID, TRANSACTION_START_TIME);
+
+ // Parse final content - should contain all three records
+ String finalContent = contentCaptor.getValue();
+ String[] lines = finalContent.trim().split("\n");
+ assertEquals(3, lines.length, "Should have three JSON lines");
+
+ // Validate each audit record
+ @SuppressWarnings("unchecked")
+ Map<String, Object> startRecord = OBJECT_MAPPER.readValue(lines[0],
Map.class);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> renewRecord = OBJECT_MAPPER.readValue(lines[1],
Map.class);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> endRecord = OBJECT_MAPPER.readValue(lines[2],
Map.class);
+
+ validateAuditRecord(startRecord, AuditOperationState.START, startTime,
true);
+ validateAuditRecord(renewRecord, AuditOperationState.RENEW, renewTime,
true);
+ validateAuditRecord(endRecord, AuditOperationState.END, endTime, false);
+ }
+
+ /**
+ * Test data provider for different owner ID formats.
+ *
+ * @return Stream of owner ID test cases
+ */
+ static java.util.stream.Stream<Arguments> ownerIdTestCases() {
+ return java.util.stream.Stream.of(
+ Arguments.of("12345678-9abc-def0-1234-567890abcdef", "Full UUID
format"),
+ Arguments.of("abc123", "Short owner ID"),
+ Arguments.of("regular-owner-id-without-uuid", "Regular owner ID"),
+ Arguments.of("writer-12345678-9abc-def0-1234-567890abcdef", "Writer
with UUID")
+ );
+ }
+
+ /**
+ * Test audit file naming with different owner ID formats.
+ * Consolidated test that verifies file path generation for various owner ID
formats.
+ *
+ * @param ownerId The owner ID to test
+ * @param description Description of the test case
+ */
+ @ParameterizedTest(name = "{1}: {0}")
+ @MethodSource("ownerIdTestCases")
+ void testFileNameWithDifferentOwnerIds(String ownerId, String description)
throws Exception {
+ long txnStartTime = System.currentTimeMillis();
+ StorageLockProviderAuditService service = new
StorageLockProviderAuditService(
+ BASE_PATH,
+ ownerId,
+ txnStartTime,
+ storageLockClient,
+ timestamp -> LOCK_EXPIRATION,
+ lockHeldSupplier);
+
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenReturn(true);
+ service.recordOperation(AuditOperationState.START,
System.currentTimeMillis());
+
+ ArgumentCaptor<String> pathCaptor = ArgumentCaptor.forClass(String.class);
+ verify(storageLockClient, times(1)).writeObject(pathCaptor.capture(),
anyString());
+
+ validateFilePath(pathCaptor.getValue(), ownerId, txnStartTime);
+ }
+
+ /**
+ * Test data provider for write failure scenarios.
+ *
+ * @return Stream of write failure test cases
+ */
+ static Stream<Arguments> writeFailureTestCases() {
+ return Stream.of(
+ Arguments.of(false, "Write returns false"),
+ Arguments.of(new RuntimeException("Storage error"), "Write throws
exception")
+ );
+ }
+
+ /**
+ * Test handling of write failures using parameterized test.
+ * Consolidates multiple write failure scenarios into a single parameterized
test.
+ *
+ * @param failureCondition The failure condition (Boolean false or Exception)
+ * @param description Description of the test case
+ */
+ @ParameterizedTest(name = "{1}")
+ @MethodSource("writeFailureTestCases")
+ void testWriteFailureHandling(Object failureCondition, String description)
throws Exception {
+ long timestamp = System.currentTimeMillis();
+
+ if (failureCondition instanceof Boolean) {
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenReturn((Boolean) failureCondition);
+ } else if (failureCondition instanceof Exception) {
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenThrow((Exception) failureCondition);
+ }
+
+ // Should not throw exception - audit failures should be handled gracefully
+ auditService.recordOperation(AuditOperationState.START, timestamp);
+
+ verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+ }
+
+ @Test
+ void testDynamicFunctionValues() throws Exception {
+ long timestamp = System.currentTimeMillis();
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenReturn(true);
+
+ // Create a service with dynamic expiration calculation
+ StorageLockProviderAuditService dynamicService = new
StorageLockProviderAuditService(
+ BASE_PATH,
+ OWNER_ID,
+ TRANSACTION_START_TIME,
+ storageLockClient,
+ ts -> ts + 1000L, // Adds 1000ms to timestamp
+ lockHeldSupplier);
+
+ when(lockHeldSupplier.get()).thenReturn(true, false);
+
+ dynamicService.recordOperation(AuditOperationState.START, timestamp);
+ dynamicService.recordOperation(AuditOperationState.END, timestamp + 1000);
+
+ ArgumentCaptor<String> contentCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(storageLockClient, times(2)).writeObject(anyString(),
contentCaptor.capture());
+
+ // Validate dynamic expiration calculation
+ String[] firstLines =
contentCaptor.getAllValues().get(0).trim().split("\n");
+ @SuppressWarnings("unchecked")
+ Map<String, Object> firstRecord = OBJECT_MAPPER.readValue(firstLines[0],
Map.class);
+
+ String[] secondLines =
contentCaptor.getAllValues().get(1).trim().split("\n");
+ @SuppressWarnings("unchecked")
+ Map<String, Object> secondRecord = OBJECT_MAPPER.readValue(secondLines[1],
Map.class);
+
+ assertTrue((Boolean) firstRecord.get("lockHeld"));
+ assertEquals(timestamp + 1000L, ((Number)
firstRecord.get("lockExpiration")).longValue());
+
+ assertFalse((Boolean) secondRecord.get("lockHeld"));
+ assertEquals(timestamp + 1000L + 1000L, ((Number)
secondRecord.get("lockExpiration")).longValue());
+ }
+
+ @Test
+ void testFilePathGeneration() throws Exception {
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenReturn(true);
+
+ auditService.recordOperation(AuditOperationState.START,
System.currentTimeMillis());
+
+ String expectedPath = String.format("%s/.hoodie/.locks/audit/%d_%s.jsonl",
+ BASE_PATH, TRANSACTION_START_TIME, OWNER_ID);
+
+ verify(storageLockClient).writeObject(eq(expectedPath), anyString());
+ }
+
+ @Test
+ void testCloseMethodWithBufferedData() throws Exception {
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenReturn(true);
+
+ // Record an operation to buffer data
+ auditService.recordOperation(AuditOperationState.START,
System.currentTimeMillis());
+
+ // Close should not write additional data (data is written immediately
after each operation)
+ auditService.close();
+
+ // Verify write was called only once for the START operation
+ verify(storageLockClient, times(1)).writeObject(anyString(), anyString());
+ }
+
+ @Test
+ void testConcurrentOperations() throws Exception {
+ when(storageLockClient.writeObject(anyString(),
anyString())).thenReturn(true);
+ when(lockHeldSupplier.get()).thenReturn(true, false);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(2);
+
+ long baseTime = System.currentTimeMillis();
+
+ // Concurrent RENEW and END operations
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ auditService.recordOperation(AuditOperationState.RENEW, baseTime +
1000);
+ } catch (Exception e) {
+ // Handle gracefully
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ auditService.recordOperation(AuditOperationState.END, baseTime + 1001);
+ } catch (Exception e) {
+ // Handle gracefully
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ startLatch.countDown();
+ assertTrue(completionLatch.await(5, TimeUnit.SECONDS));
+
+ // Verify both operations were recorded
+ ArgumentCaptor<String> contentCaptor =
ArgumentCaptor.forClass(String.class);
+ verify(storageLockClient, times(2)).writeObject(anyString(),
contentCaptor.capture());
+
+ String finalContent = contentCaptor.getValue();
+ String[] lines = finalContent.trim().split("\\n");
+ assertEquals(2, lines.length);
+
+ // Verify both RENEW and END states are present
+ @SuppressWarnings("unchecked")
+ Map<String, Object> record1 = OBJECT_MAPPER.readValue(lines[0], Map.class);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> record2 = OBJECT_MAPPER.readValue(lines[1], Map.class);
+
+ String state1 = (String) record1.get("state");
+ String state2 = (String) record2.get("state");
+ assertTrue((state1.equals("RENEW") && state2.equals("END"))
+ || (state1.equals("END") && state2.equals("RENEW")));
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index ce86c409baf6..ffde5337397d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -100,6 +100,8 @@ object HoodieProcedures {
,(ShowCleansProcedure.NAME, ShowCleansProcedure.builder)
,(ShowCleansPartitionMetadataProcedure.NAME,
ShowCleansPartitionMetadataProcedure.builder)
,(ShowCleansPlanProcedure.NAME, ShowCleansPlanProcedure.builder)
+ ,(SetAuditLockProcedure.NAME, SetAuditLockProcedure.builder)
+ ,(ShowAuditLockStatusProcedure.NAME,
ShowAuditLockStatusProcedure.builder)
)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/SetAuditLockProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/SetAuditLockProcedure.scala
new file mode 100644
index 000000000000..826572ad97fc
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/SetAuditLockProcedure.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.client.transaction.lock.StorageLockClient
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.storage.StoragePath
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+import scala.util.{Failure, Success, Try}
+
+
+/**
+ * Spark SQL procedure for enabling or disabling lock audit logging for Hudi
tables.
+ *
+ * This procedure allows users to control audit logging for storage lock
operations through
+ * Spark SQL commands. When enabled, lock operations will generate audit logs
in JSONL format
+ * that track lock lifecycle events.
+ *
+ * Usage:
+ * {{{
+ * CALL set_audit_lock(table => 'my_table', state => 'enabled')
+ * CALL set_audit_lock(path => '/path/to/table', state => 'disabled')
+ * }}}
+ *
+ * The procedure creates or updates an audit configuration file at:
+ * `{table_path}/.hoodie/.locks/audit_enabled.json`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class SetAuditLockProcedure extends BaseProcedure with ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
+ ProcedureParameter.required(2, "state", DataTypes.StringType)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("table", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("audit_state", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("message", DataTypes.StringType, nullable = false,
Metadata.empty)
+ ))
+
+ private val OBJECT_MAPPER = new ObjectMapper()
+
+ /**
+ * Returns the procedure parameters definition.
+ *
+ * @return Array of parameters: table (optional String), path (optional
String), and state (required String)
+ */
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ /**
+ * Returns the output schema for the procedure result.
+ *
+ * @return StructType containing table, audit_state, and message columns
+ */
+ def outputType: StructType = OUTPUT_TYPE
+
+ /**
+ * Executes the audit lock set procedure.
+ *
+ * @param args Procedure arguments containing table name or path and desired
state
+ * @return Sequence containing a single Row with execution results
+ * @throws IllegalArgumentException if state parameter is not 'enabled' or
'disabled'
+ */
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val state = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[String].toLowerCase
+
+ // Validate state parameter
+ if (state != "enabled" && state != "disabled") {
+ throw new IllegalArgumentException("State parameter must be 'enabled' or
'disabled'")
+ }
+
+ // Get the base path using BaseProcedure helper (handles table/path
validation)
+ val basePath: String = getBasePath(tableName, tablePath)
+ val metaClient = createMetaClient(jsc, basePath)
+
+ // Use table name if provided, otherwise extract from path
+ val displayName =
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+ try {
+ val auditEnabled = state == "enabled"
+ setAuditState(metaClient, basePath, auditEnabled)
+
+ val resultState = if (auditEnabled) "enabled" else "disabled"
+ val message = s"Lock audit logging successfully $resultState"
+
+ Seq(Row(displayName, resultState, message))
+ } catch {
+ case e: Exception =>
+ val errorMessage = s"Failed to set audit state: ${e.getMessage}"
+ Seq(Row(displayName, "error", errorMessage))
+ }
+ }
+
+ /**
+ * Sets the audit state by creating or updating the audit configuration file.
+ *
+ * @param metaClient Hudi table meta client for storage operations
+ * @param basePath Base path of the Hudi table
+ * @param enabled Whether audit logging should be enabled
+ * @throws RuntimeException if unable to write the audit configuration
+ */
+ private def setAuditState(metaClient: HoodieTableMetaClient, basePath:
String, enabled: Boolean): Unit = {
+ val storage = metaClient.getStorage
+ val lockFolderPath = StorageLockClient.getLockFolderPath(basePath)
+ val auditConfigPath = new
StoragePath(StorageLockProviderAuditService.getAuditConfigPath(basePath))
+
+ // Ensure the locks folder exists
+ if (!storage.exists(new StoragePath(lockFolderPath))) {
+ storage.createDirectory(new StoragePath(lockFolderPath))
+ }
+
+ // Create or update the audit configuration file
+ val jsonContent = createAuditConfig(enabled)
+
+ Try {
+ val outputStream = storage.create(auditConfigPath, true) // overwrite if
exists
+ try {
+ outputStream.write(jsonContent.getBytes("UTF-8"))
+ } finally {
+ outputStream.close()
+ }
+ } match {
+ case Success(_) =>
+ // Configuration written successfully
+ case Failure(exception) =>
+ throw new RuntimeException(s"Failed to write audit configuration to
${auditConfigPath.toString}", exception)
+ }
+ }
+
+ /**
+ * Creates the JSON configuration content for audit settings.
+ *
+ * @param enabled Whether audit logging should be enabled
+ * @return JSON string representation of the audit configuration
+ */
+ private def createAuditConfig(enabled: Boolean): String = {
+ val rootNode: ObjectNode = OBJECT_MAPPER.createObjectNode()
+
rootNode.put(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD,
enabled)
+ OBJECT_MAPPER.writeValueAsString(rootNode)
+ }
+
+ override def build: Procedure = new SetAuditLockProcedure()
+}
+
+/**
+ * Companion object for SetAuditLockProcedure containing constants and factory
methods.
+ */
+object SetAuditLockProcedure {
+ val NAME = "set_audit_lock"
+
+ /**
+ * Factory method to create procedure builder instances.
+ *
+ * @return Supplier that creates new SetAuditLockProcedure instances
+ */
+ def builder: Supplier[ProcedureBuilder] = () => new SetAuditLockProcedure()
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowAuditLockStatusProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowAuditLockStatusProcedure.scala
new file mode 100644
index 000000000000..97cce27d4586
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowAuditLockStatusProcedure.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import
org.apache.hudi.client.transaction.lock.audit.StorageLockProviderAuditService
+import org.apache.hudi.common.util.FileIOUtils
+import org.apache.hudi.storage.StoragePath
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Spark SQL procedure for showing the current audit logging status for Hudi
tables.
+ *
+ * This procedure allows users to check whether audit logging is currently
enabled
+ * or disabled for storage lock operations. It reads the audit configuration
file
+ * and reports the current status along with relevant paths.
+ *
+ * Usage:
+ * {{{
+ * CALL show_audit_lock_status(table => 'my_table')
+ * CALL show_audit_lock_status(path => '/path/to/table')
+ * }}}
+ *
+ * The procedure reads the audit configuration file from:
+ * `{table_path}/.hoodie/.locks/audit_enabled.json`
+ *
+ * @author Apache Hudi
+ * @since 1.0.0
+ */
+class ShowAuditLockStatusProcedure extends BaseProcedure with ProcedureBuilder
{
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("table", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("audit_enabled", DataTypes.BooleanType, nullable = false,
Metadata.empty),
+ StructField("config_path", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("audit_folder_path", DataTypes.StringType, nullable = false,
Metadata.empty)
+ ))
+
+ private val OBJECT_MAPPER = new ObjectMapper()
+
+ /**
+ * Returns the procedure parameters definition.
+ *
+ * @return Array of parameters: table (optional String) and path (optional
String)
+ */
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ /**
+ * Returns the output schema for the procedure result.
+ *
+ * @return StructType containing table, audit_enabled, config_path, and
audit_folder_path columns
+ */
+ def outputType: StructType = OUTPUT_TYPE
+
+ /**
+ * Executes the show audit lock status procedure.
+ *
+ * @param args Procedure arguments containing table name or path
+ * @return Sequence containing a single Row with status information
+ * @throws IllegalArgumentException if neither table nor path is provided,
or both are provided
+ */
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+
+ // Get the base path using BaseProcedure helper (handles table/path
validation)
+ val basePath: String = getBasePath(tableName, tablePath)
+ val metaClient = createMetaClient(jsc, basePath)
+
+ // Use table name if provided, otherwise extract from path
+ val displayName =
tableName.map(_.asInstanceOf[String]).getOrElse(tablePath.get.asInstanceOf[String])
+
+ try {
+ val auditStatus = checkAuditStatus(metaClient, basePath)
+ val configPath =
StorageLockProviderAuditService.getAuditConfigPath(basePath)
+ val auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(basePath)
+
+ Seq(Row(displayName, auditStatus, configPath, auditFolderPath))
+ } catch {
+ case e: Exception =>
+ // Return false for audit status if we can't read the config
+ val configPath =
StorageLockProviderAuditService.getAuditConfigPath(basePath)
+ val auditFolderPath =
StorageLockProviderAuditService.getAuditFolderPath(basePath)
+ Seq(Row(displayName, false, configPath, auditFolderPath))
+ }
+ }
+
+ /**
+ * Checks the current audit status by reading the audit configuration file.
+ *
+ * @param metaClient Hudi table meta client for storage operations
+ * @param basePath Base path of the Hudi table
+ * @return true if audit logging is enabled, false otherwise
+ */
+ private def checkAuditStatus(metaClient:
org.apache.hudi.common.table.HoodieTableMetaClient, basePath: String): Boolean
= {
+ val storage = metaClient.getStorage
+ val auditConfigPath = new
StoragePath(StorageLockProviderAuditService.getAuditConfigPath(basePath))
+
+ if (!storage.exists(auditConfigPath)) {
+ false
+ } else {
+
+ Try {
+ val inputStream = storage.open(auditConfigPath)
+ try {
+ val configContent = new
String(FileIOUtils.readAsByteArray(inputStream))
+ val rootNode: JsonNode = OBJECT_MAPPER.readTree(configContent)
+ val enabledNode =
rootNode.get(StorageLockProviderAuditService.STORAGE_LOCK_AUDIT_SERVICE_ENABLED_FIELD)
+ Option(enabledNode).exists(_.asBoolean(false))
+ } finally {
+ inputStream.close()
+ }
+ } match {
+ case Success(enabled) => enabled
+ case Failure(_) => false
+ }
+ }
+ }
+
+ /**
+ * Builds a new instance of the ShowAuditLockStatusProcedure.
+ *
+ * @return New ShowAuditLockStatusProcedure instance
+ */
+ override def build: Procedure = new ShowAuditLockStatusProcedure()
+}
+
+/**
+ * Companion object for ShowAuditLockStatusProcedure containing constants and
factory methods.
+ */
+object ShowAuditLockStatusProcedure {
+ /** The name used to register and invoke this procedure */
+ val NAME = "show_audit_lock_status"
+
+ /**
+ * Factory method to create procedure builder instances.
+ *
+ * @return Supplier that creates new ShowAuditLockStatusProcedure instances
+ */
+ def builder: Supplier[ProcedureBuilder] = () => new
ShowAuditLockStatusProcedure()
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSetAuditLockProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSetAuditLockProcedure.scala
new file mode 100644
index 000000000000..d8c2c79d8a57
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSetAuditLockProcedure.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import java.io.File
+
+/**
+ * Test suite for the SetAuditLockProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the set_audit_lock procedure, including enabling/disabling audit logging,
+ * parameter validation, and error handling scenarios.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestSetAuditLockProcedure extends HoodieSparkProcedureTestBase {
+
+ override def generateTableName: String = {
+ super.generateTableName.split("\\.").last
+ }
+
+ /**
+ * Helper method to create a test table and return its path.
+ */
+ private def createTestTable(tmp: File, tableName: String): String = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+ // Insert data to initialize the Hudi metadata structure
+ spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+ s"${tmp.getCanonicalPath}/$tableName"
+ }
+
+ /**
+ * Test enabling audit logging using table name parameter.
+ */
+ test("Test Call set_audit_lock Procedure - Enable Audit with Table Name") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ val result = spark.sql(s"""call set_audit_lock(table => '$tableName',
state => 'enabled')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0))
+ assertResult("enabled")(result.head.get(1))
+ assert(result.head.get(2).toString.contains("successfully enabled"))
+ }
+ }
+
+ /**
+ * Test enabling audit logging using path parameter.
+ */
+ test("Test Call set_audit_lock Procedure - Enable Audit with Path") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ val result = spark.sql(s"""call set_audit_lock(path => '$tablePath',
state => 'enabled')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tablePath)(result.head.get(0))
+ assertResult("enabled")(result.head.get(1))
+ assert(result.head.get(2).toString.contains("successfully enabled"))
+ }
+ }
+
+ /**
+ * Test disabling audit logging using table name parameter.
+ */
+ test("Test Call set_audit_lock Procedure - Disable Audit with Table Name") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ val result = spark.sql(s"""call set_audit_lock(table => '$tableName',
state => 'disabled')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0))
+ assertResult("disabled")(result.head.get(1))
+ assert(result.head.get(2).toString.contains("successfully disabled"))
+ }
+ }
+
+ /**
+ * Test disabling audit logging using path parameter.
+ */
+ test("Test Call set_audit_lock Procedure - Disable Audit with Path") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ val result = spark.sql(s"""call set_audit_lock(path => '$tablePath',
state => 'disabled')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tablePath)(result.head.get(0))
+ assertResult("disabled")(result.head.get(1))
+ assert(result.head.get(2).toString.contains("successfully disabled"))
+ }
+ }
+
+ /**
+ * Test parameter validation by providing an invalid state parameter.
+ * Verifies that the procedure rejects invalid state values and provides
+ * an appropriate error message.
+ */
+ test("Test Call set_audit_lock Procedure - Invalid State Parameter") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ createTestTable(tmp, tableName)
+
+ // Test invalid state parameter
+ checkExceptionContain(s"""call set_audit_lock(table => '$tableName',
state => 'invalid')""")(
+ "State parameter must be 'enabled' or 'disabled'")
+ }
+ }
+
+ /**
+ * Test parameter validation by omitting required arguments.
+ * Verifies that the procedure properly validates required parameters
+ * and provides appropriate error messages for missing arguments.
+ */
+ test("Test Call set_audit_lock Procedure - Missing Required Arguments") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Test missing both table and path parameters
+ checkExceptionContain(s"""call set_audit_lock(state => 'enabled')""")(
+ "Table name or table path must be given one")
+
+ // Test missing state parameter
+ checkExceptionContain(s"""call set_audit_lock(table => '$tableName')""")(
+ "Argument: state is required")
+
+ // Test providing both table and path parameters - should work fine
(uses table parameter)
+ val result = spark.sql(s"""call set_audit_lock(table => '$tableName',
path => '$tablePath', state => 'enabled')""").collect()
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.getString(0))
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowAuditLockStatusProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowAuditLockStatusProcedure.scala
new file mode 100644
index 000000000000..87cf04967694
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowAuditLockStatusProcedure.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import java.io.File
+
+/**
+ * Test suite for the ShowAuditLockStatusProcedure Spark SQL procedure.
+ *
+ * This class contains comprehensive tests to verify the functionality of
+ * the show_audit_lock_status procedure, including status checking with both
+ * table name and path parameters, and various audit states.
+ *
+ * @author Apache Hudi
+ * @since 1.1.0
+ */
+class TestShowAuditLockStatusProcedure extends HoodieSparkProcedureTestBase {
+
+ override def generateTableName: String = {
+ super.generateTableName.split("\\.").last
+ }
+
+ /**
+ * Helper method to create a test table and return its path.
+ */
+ private def createTestTable(tmp: File, tableName: String): String = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | orderingFields = 'ts'
+ | )
+ """.stripMargin)
+ // Insert data to initialize the Hudi metadata structure
+ spark.sql(s"insert into $tableName select 1, 'test', 10.0, 1000")
+ s"${tmp.getCanonicalPath}/$tableName"
+ }
+
+ /**
+ * Test showing audit status when audit is disabled (default state) using
table name.
+ */
+ test("Test Show Audit Status - Disabled with Table Name") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ createTestTable(tmp, tableName)
+
+ val result = spark.sql(s"""call show_audit_lock_status(table =>
'$tableName')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0)) // table name
+ assertResult(false)(result.head.get(1)) // audit_enabled
+ assert(result.head.get(2).toString.contains("audit_enabled.json")) //
config_path
+ assert(result.head.get(3).toString.contains("audit")) //
audit_folder_path
+ }
+ }
+
+ /**
+ * Test showing audit status when audit is disabled (default state) using
path.
+ */
+ test("Test Show Audit Status - Disabled with Path") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ val result = spark.sql(s"""call show_audit_lock_status(path =>
'$tablePath')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tablePath)(result.head.get(0)) // path
+ assertResult(false)(result.head.get(1)) // audit_enabled
+ assert(result.head.get(2).toString.contains("audit_enabled.json")) //
config_path
+ assert(result.head.get(3).toString.contains("audit")) //
audit_folder_path
+ }
+ }
+
+ /**
+ * Test showing audit status when audit is enabled using table name.
+ */
+ test("Test Show Audit Status - Enabled with Table Name") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ createTestTable(tmp, tableName)
+
+ // First enable audit logging
+ spark.sql(s"""call set_audit_lock(table => '$tableName', state =>
'enabled')""")
+
+ // Then check the status
+ val result = spark.sql(s"""call show_audit_lock_status(table =>
'$tableName')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.get(0)) // table name
+ assertResult(true)(result.head.get(1)) // audit_enabled
+ assert(result.head.get(2).toString.contains("audit_enabled.json")) //
config_path
+ assert(result.head.get(3).toString.contains("audit")) //
audit_folder_path
+ }
+ }
+
+ /**
+ * Test showing audit status when audit is enabled using path.
+ */
+ test("Test Show Audit Status - Enabled with Path") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // First enable audit logging
+ spark.sql(s"""call set_audit_lock(path => '$tablePath', state =>
'enabled')""")
+
+ // Then check the status
+ val result = spark.sql(s"""call show_audit_lock_status(path =>
'$tablePath')""").collect()
+
+ assertResult(1)(result.length)
+ assertResult(tablePath)(result.head.get(0)) // path
+ assertResult(true)(result.head.get(1)) // audit_enabled
+ assert(result.head.get(2).toString.contains("audit_enabled.json")) //
config_path
+ assert(result.head.get(3).toString.contains("audit")) //
audit_folder_path
+ }
+ }
+
+ /**
+ * Test audit status after enabling and then disabling audit.
+ * Verifies that the status correctly reflects the disabled state.
+ */
+ test("Test Show Audit Status After Enable and Disable") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ createTestTable(tmp, tableName)
+
+ // Enable audit
+ spark.sql(s"""call set_audit_lock(table => '$tableName', state =>
'enabled')""")
+ // Verify enabled status
+ val enabledResult = spark.sql(s"""call show_audit_lock_status(table =>
'$tableName')""").collect()
+ assertResult(true)(enabledResult.head.get(1))
+
+ // Disable audit
+ spark.sql(s"""call set_audit_lock(table => '$tableName', state =>
'disabled')""")
+ // Verify disabled status
+ val disabledResult = spark.sql(s"""call show_audit_lock_status(table =>
'$tableName')""").collect()
+ assertResult(false)(disabledResult.head.get(1))
+ }
+ }
+
+ /**
+ * Test parameter validation by omitting required arguments.
+ * Verifies that the procedure properly validates required parameters.
+ */
+ test("Test Show Audit Status - Missing Required Arguments") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = createTestTable(tmp, tableName)
+
+ // Test missing both table and path parameters
+ checkExceptionContain(s"""call show_audit_lock_status()""")(
+ "Table name or table path must be given one")
+
+ // Test providing both table and path parameters - should work fine
(uses table parameter)
+ val result = spark.sql(s"""call show_audit_lock_status(table =>
'$tableName', path => '$tablePath')""").collect()
+ assertResult(1)(result.length)
+ assertResult(tableName)(result.head.getString(0))
+ }
+ }
+
+ /**
+ * Test showing audit status for a table that doesn't exist.
+ * Verifies graceful handling of non-existent tables.
+ */
+ test("Test Show Audit Status - Non-existent Table") {
+ val nonExistentTable = "non_existent_table_" + System.currentTimeMillis()
+ // This should throw an exception for non-existent table
+ assertThrows[Exception] {
+ spark.sql(s"""call show_audit_lock_status(table =>
'$nonExistentTable')""").collect()
+ }
+ }
+
+ /**
+ * Test output schema verification.
+ * Ensures the procedure returns the correct column structure.
+ */
+ test("Test Show Audit Status - Output Schema") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ createTestTable(tmp, tableName)
+
+ val result = spark.sql(s"""call show_audit_lock_status(table =>
'$tableName')""")
+ val schema = result.schema
+
+ // Verify column count and names
+ assertResult(4)(schema.fields.length)
+ assertResult("table")(schema.fields(0).name)
+ assertResult("audit_enabled")(schema.fields(1).name)
+ assertResult("config_path")(schema.fields(2).name)
+ assertResult("audit_folder_path")(schema.fields(3).name)
+
+ // Verify column types
+ assertResult("string")(schema.fields(0).dataType.typeName)
+ assertResult("boolean")(schema.fields(1).dataType.typeName)
+ assertResult("string")(schema.fields(2).dataType.typeName)
+ assertResult("string")(schema.fields(3).dataType.typeName)
+ }
+ }
+
+}