This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7232d2461f4 [HUDI-8005] Add a new DynamoDB based lock provider with 
auto-generated partition key (#11667)
7232d2461f4 is described below

commit 7232d2461f4295c80001df73da2ddc150f84c8cd
Author: Davis-Zhang-Onehouse 
<169106455+davis-zhang-oneho...@users.noreply.github.com>
AuthorDate: Thu Jul 25 08:43:50 2024 -0700

    [HUDI-8005] Add a new DynamoDB based lock provider with auto-generated 
partition key (#11667)
---
 ...amoDBBasedImplicitPartitionKeyLockProvider.java |  67 +++++++
 .../lock/DynamoDBBasedLockProvider.java            | 187 ++----------------
 ...der.java => DynamoDBBasedLockProviderBase.java} |  91 +++++----
 .../hudi/config/DynamoDbBasedLockConfig.java       |  15 +-
 .../integ/ITTestDynamoDBBasedLockProvider.java     | 211 ++++++++++++++++-----
 .../org/apache/hudi/common/util/hash/HashID.java   |  22 ++-
 .../apache/hudi/common/util/hash/TestHashID.java   |  40 +++-
 .../src/test/resources/hash/magic_input.txt        |  10 +
 .../hash/xxhash_BITS_128_for_magic_input.txt       |  10 +
 .../hash/xxhash_BITS_32_for_magic_input.txt        |  10 +
 .../hash/xxhash_BITS_64_for_magic_input.txt        |  10 +
 .../org/apache/hudi/common/util/StringUtils.java   |  54 ++++++
 .../apache/hudi/common/util/TestStringUtils.java   |  92 ++++++++-
 scripts/release/validate_source_copyright.sh       |   4 +-
 14 files changed, 533 insertions(+), 290 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
new file mode 100644
index 00000000000..96f8f7f30e4
--- /dev/null
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedImplicitPartitionKeyLockProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.aws.transaction.lock;
+
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.hash.HashID;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.MAX_PARTITION_KEY_SIZE_BYTE;
+
+/**
+ * A DynamoDB based lock.
+ * It implicitly derives the partition key from the hudi table name and hudi 
table base path
+ * available in the lock configuration.
+ */
+@NotThreadSafe
+public class DynamoDBBasedImplicitPartitionKeyLockProvider extends 
DynamoDBBasedLockProviderBase {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedImplicitPartitionKeyLockProvider.class);
+
+  public DynamoDBBasedImplicitPartitionKeyLockProvider(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> conf) {
+    this(lockConfiguration, conf, null);
+  }
+
+  public DynamoDBBasedImplicitPartitionKeyLockProvider(
+      final LockConfiguration lockConfiguration, final StorageConfiguration<?> 
conf, DynamoDbClient dynamoDB) {
+    super(lockConfiguration, conf, dynamoDB);
+  }
+
+  public static String generatePartitionKey(String basePath, String tableName) 
{
+    String hashPart = '-' + HashID.generateXXHashAsString(basePath, 
HashID.Size.BITS_64);
+    String partitionKey = concatenateWithThreshold(tableName, hashPart, 
MAX_PARTITION_KEY_SIZE_BYTE);
+    LOG.info(String.format("Partition key for base path %s is %s", basePath, 
partitionKey));
+    return partitionKey;
+  }
+
+  @Override
+  public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
+    String hudiTableBasePath = 
lockConfiguration.getConfig().getString(HoodieCommonConfig.BASE_PATH.key());
+    String hudiTableName = 
lockConfiguration.getConfig().getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+    return generatePartitionKey(hudiTableBasePath, hudiTableName);
+  }
+}
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
index bbdcb4e7003..2876513b11e 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
@@ -18,200 +18,39 @@
 
 package org.apache.hudi.aws.transaction.lock;
 
-import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
-import org.apache.hudi.aws.utils.DynamoTableUtils;
 import org.apache.hudi.common.config.LockConfiguration;
-import org.apache.hudi.common.lock.LockProvider;
-import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.DynamoDbBasedLockConfig;
-import org.apache.hudi.exception.HoodieLockException;
 import org.apache.hudi.storage.StorageConfiguration;
 
-import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
-import com.amazonaws.services.dynamodbv2.LockItem;
-import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
-import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
-import software.amazon.awssdk.services.dynamodb.model.BillingMode;
-import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
-import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
-import software.amazon.awssdk.services.dynamodb.model.KeyType;
-import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
-import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.Serializable;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY;
 
 /**
- * A DynamoDB based lock. This {@link LockProvider} implementation allows to 
lock table operations
- * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use 
this lock.
+ * A DynamoDB based lock.
+ * It expects partition key is explicitly available in DynamoDbBasedLockConfig.
  */
 @NotThreadSafe
-public class DynamoDBBasedLockProvider implements LockProvider<LockItem>, 
Serializable {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedLockProvider.class);
-
-  private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
-
-  private final transient AmazonDynamoDBLockClient client;
-  private final String tableName;
-  private final String dynamoDBPartitionKey;
-  protected final DynamoDbBasedLockConfig dynamoDBLockConfiguration;
-  private volatile LockItem lock;
+public class DynamoDBBasedLockProvider extends DynamoDBBasedLockProviderBase {
 
   public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
     this(lockConfiguration, conf, null);
   }
 
   public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
-    this.dynamoDBLockConfiguration = DynamoDbBasedLockConfig.newBuilder()
-        .fromProperties(lockConfiguration.getConfig())
-        .build();
-    this.tableName = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
-    this.dynamoDBPartitionKey = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY);
-    long leaseDuration = 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
-    if (dynamoDB == null) {
-      dynamoDB = getDynamoDBClient();
-    }
-    // build the dynamoDb lock client
-    this.client = new AmazonDynamoDBLockClient(
-        AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
-            .withTimeUnit(TimeUnit.MILLISECONDS)
-            .withLeaseDuration(leaseDuration)
-            .withHeartbeatPeriod(leaseDuration / 3)
-            .withCreateHeartbeatBackgroundThread(true)
-            .build());
-
-    if (!this.client.lockTableExists()) {
-      createLockTableInDynamoDB(dynamoDB, tableName);
-    }
-  }
-
-  @Override
-  public boolean tryLock(long time, TimeUnit unit) {
-    LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
-    try {
-      lock = 
client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
-              .withAdditionalTimeToWaitForLock(time)
-              .withTimeUnit(TimeUnit.MILLISECONDS)
-              .build());
-      LOG.info(generateLogStatement(LockState.ACQUIRED, 
generateLogSuffixString()));
-    } catch (InterruptedException e) {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()), e);
-    } catch (LockNotGrantedException e) {
-      return false;
-    }
-    return lock != null && !lock.isExpired();
+    super(lockConfiguration, conf, dynamoDB);
   }
 
   @Override
-  public void unlock() {
-    try {
-      LOG.info(generateLogStatement(LockState.RELEASING, 
generateLogSuffixString()));
-      if (lock == null) {
-        return;
-      }
-      if (!client.releaseLock(lock)) {
-        LOG.warn("The lock has already been stolen");
-      }
-      lock = null;
-      LOG.info(generateLogStatement(LockState.RELEASED, 
generateLogSuffixString()));
-    } catch (Exception e) {
-      throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE, 
generateLogSuffixString()), e);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (lock != null) {
-        if (!client.releaseLock(lock)) {
-          LOG.warn("The lock has already been stolen");
-        }
-        lock = null;
-      }
-      this.client.close();
-    } catch (Exception e) {
-      LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE, 
generateLogSuffixString()));
-    }
-  }
-
-  @Override
-  public LockItem getLock() {
-    return lock;
-  }
-
-  private DynamoDbClient getDynamoDBClient() {
-    String region = 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION);
-    String endpointURL = 
this.dynamoDBLockConfiguration.contains(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key())
-            ? 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL)
-            : 
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
-
-    if (!endpointURL.startsWith("https://";) && 
!endpointURL.startsWith("http://";)) {
-      endpointURL = "https://"; + endpointURL;
-    }
-
-    return DynamoDbClient.builder()
-            .endpointOverride(URI.create(endpointURL))
-            
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDBLockConfiguration.getProps()))
-            .build();
-  }
-
-  private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String 
tableName) {
-    String billingMode = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE);
-    KeySchemaElement partitionKeyElement = KeySchemaElement
-            .builder()
-            .attributeName(DYNAMODB_ATTRIBUTE_NAME)
-            .keyType(KeyType.HASH)
-            .build();
-
-    List<KeySchemaElement> keySchema = new ArrayList<>();
-    keySchema.add(partitionKeyElement);
-
-    Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
-    
attributeDefinitions.add(AttributeDefinition.builder().attributeName(DYNAMODB_ATTRIBUTE_NAME).attributeType(ScalarAttributeType.S).build());
-    CreateTableRequest.Builder createTableRequestBuilder = 
CreateTableRequest.builder();
-    if (billingMode.equals(BillingMode.PROVISIONED.name())) {
-      
createTableRequestBuilder.provisionedThroughput(ProvisionedThroughput.builder()
-              
.readCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY))
-              
.writeCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY))
-              .build());
-    }
-    createTableRequestBuilder.tableName(tableName)
-            .keySchema(keySchema)
-            .attributeDefinitions(attributeDefinitions)
-            .billingMode(billingMode);
-    dynamoDB.createTable(createTableRequestBuilder.build());
-
-    LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to 
be active");
-    try {
-
-      DynamoTableUtils.waitUntilActive(dynamoDB, tableName, 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT),
 20 * 1000);
-    } catch (DynamoTableUtils.TableNeverTransitionedToStateException e) {
-      throw new HoodieLockException("Created dynamoDB table never transits to 
active", e);
-    } catch (InterruptedException e) {
-      throw new HoodieLockException("Thread interrupted while waiting for 
dynamoDB table to turn active", e);
-    }
-    LOG.info("Created dynamoDB table " + tableName);
-  }
-
-  private String generateLogSuffixString() {
-    return StringUtils.join("DynamoDb table = ", tableName, ", partition key = 
", dynamoDBPartitionKey);
-  }
-
-  protected String generateLogStatement(LockState state, String suffix) {
-    return StringUtils.join(state.name(), " lock at ", suffix);
+  public String getDynamoDBPartitionKey(LockConfiguration lockConfiguration) {
+    DynamoDbBasedLockConfig config = new DynamoDbBasedLockConfig.Builder()
+        .fromProperties(lockConfiguration.getConfig()).build();
+    ValidationUtils.checkArgument(
+        config.contains(DYNAMODB_LOCK_PARTITION_KEY),
+        "Config key is not found: " + DYNAMODB_LOCK_PARTITION_KEY.key());
+    return config.getString(DYNAMODB_LOCK_PARTITION_KEY);
   }
 }
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
similarity index 70%
copy from 
hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
copy to 
hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
index bbdcb4e7003..e9a3228025f 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProvider.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/transaction/lock/DynamoDBBasedLockProviderBase.java
@@ -47,44 +47,47 @@ import 
software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT;
+import static 
org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY;
+
 /**
  * A DynamoDB based lock. This {@link LockProvider} implementation allows to 
lock table operations
  * using DynamoDB. Users need to have access to AWS DynamoDB to be able to use 
this lock.
  */
 @NotThreadSafe
-public class DynamoDBBasedLockProvider implements LockProvider<LockItem>, 
Serializable {
+public abstract class DynamoDBBasedLockProviderBase implements 
LockProvider<LockItem> {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedLockProvider.class);
+  protected static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBBasedLockProviderBase.class);
 
-  private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
+  protected static final String DYNAMODB_ATTRIBUTE_NAME = "key";
 
-  private final transient AmazonDynamoDBLockClient client;
-  private final String tableName;
+  protected final DynamoDbBasedLockConfig dynamoDbBasedLockConfig;
+  protected final AmazonDynamoDBLockClient client;
+  protected final String tableName;
   private final String dynamoDBPartitionKey;
-  protected final DynamoDbBasedLockConfig dynamoDBLockConfiguration;
-  private volatile LockItem lock;
-
-  public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf) {
-    this(lockConfiguration, conf, null);
-  }
+  protected volatile LockItem lock;
 
-  public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, 
final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) {
-    this.dynamoDBLockConfiguration = DynamoDbBasedLockConfig.newBuilder()
-        .fromProperties(lockConfiguration.getConfig())
-        .build();
-    this.tableName = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
-    this.dynamoDBPartitionKey = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY);
-    long leaseDuration = 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
+  protected DynamoDBBasedLockProviderBase(final LockConfiguration 
lockConfiguration, final StorageConfiguration<?> conf, DynamoDbClient dynamoDB) 
{
     if (dynamoDB == null) {
       dynamoDB = getDynamoDBClient();
     }
+    this.dynamoDbBasedLockConfig = new DynamoDbBasedLockConfig.Builder()
+        .fromProperties(lockConfiguration.getConfig())
+        .build();
+    this.tableName = 
dynamoDbBasedLockConfig.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME);
+    long leaseDuration = 
dynamoDbBasedLockConfig.getInt(DynamoDbBasedLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY);
+    dynamoDBPartitionKey = getDynamoDBPartitionKey(lockConfiguration);
+
     // build the dynamoDb lock client
     this.client = new AmazonDynamoDBLockClient(
         AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
@@ -99,14 +102,20 @@ public class DynamoDBBasedLockProvider implements 
LockProvider<LockItem>, Serial
     }
   }
 
+  public abstract String getDynamoDBPartitionKey(LockConfiguration 
lockConfiguration);
+
+  public String getPartitionKey() {
+    return dynamoDBPartitionKey;
+  }
+
   @Override
   public boolean tryLock(long time, TimeUnit unit) {
     LOG.info(generateLogStatement(LockState.ACQUIRING, 
generateLogSuffixString()));
     try {
       lock = 
client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
-              .withAdditionalTimeToWaitForLock(time)
-              .withTimeUnit(TimeUnit.MILLISECONDS)
-              .build());
+          .withAdditionalTimeToWaitForLock(time)
+          .withTimeUnit(TimeUnit.MILLISECONDS)
+          .build());
       LOG.info(generateLogStatement(LockState.ACQUIRED, 
generateLogSuffixString()));
     } catch (InterruptedException e) {
       throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, 
generateLogSuffixString()), e);
@@ -154,28 +163,28 @@ public class DynamoDBBasedLockProvider implements 
LockProvider<LockItem>, Serial
   }
 
   private DynamoDbClient getDynamoDBClient() {
-    String region = 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION);
-    String endpointURL = 
this.dynamoDBLockConfiguration.contains(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL.key())
-            ? 
this.dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_ENDPOINT_URL)
-            : 
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
+    String region = dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_REGION);
+    String endpointURL = 
dynamoDbBasedLockConfig.contains(DYNAMODB_ENDPOINT_URL.key())
+        ? dynamoDbBasedLockConfig.getString(DYNAMODB_ENDPOINT_URL)
+        : 
DynamoDbClient.serviceMetadata().endpointFor(Region.of(region)).toString();
 
     if (!endpointURL.startsWith("https://";) && 
!endpointURL.startsWith("http://";)) {
       endpointURL = "https://"; + endpointURL;
     }
 
     return DynamoDbClient.builder()
-            .endpointOverride(URI.create(endpointURL))
-            
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDBLockConfiguration.getProps()))
-            .build();
+        .endpointOverride(URI.create(endpointURL))
+        
.credentialsProvider(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(dynamoDbBasedLockConfig.getProps()))
+        .build();
   }
 
   private void createLockTableInDynamoDB(DynamoDbClient dynamoDB, String 
tableName) {
-    String billingMode = 
dynamoDBLockConfiguration.getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE);
+    String billingMode = 
dynamoDbBasedLockConfig.getString(DYNAMODB_LOCK_BILLING_MODE);
     KeySchemaElement partitionKeyElement = KeySchemaElement
-            .builder()
-            .attributeName(DYNAMODB_ATTRIBUTE_NAME)
-            .keyType(KeyType.HASH)
-            .build();
+        .builder()
+        .attributeName(DYNAMODB_ATTRIBUTE_NAME)
+        .keyType(KeyType.HASH)
+        .build();
 
     List<KeySchemaElement> keySchema = new ArrayList<>();
     keySchema.add(partitionKeyElement);
@@ -185,20 +194,20 @@ public class DynamoDBBasedLockProvider implements 
LockProvider<LockItem>, Serial
     CreateTableRequest.Builder createTableRequestBuilder = 
CreateTableRequest.builder();
     if (billingMode.equals(BillingMode.PROVISIONED.name())) {
       
createTableRequestBuilder.provisionedThroughput(ProvisionedThroughput.builder()
-              
.readCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY))
-              
.writeCapacityUnits(dynamoDBLockConfiguration.getLong(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY))
-              .build());
+          
.readCapacityUnits(dynamoDbBasedLockConfig.getLong(DYNAMODB_LOCK_READ_CAPACITY))
+          
.writeCapacityUnits(dynamoDbBasedLockConfig.getLong(DYNAMODB_LOCK_WRITE_CAPACITY))
+          .build());
     }
     createTableRequestBuilder.tableName(tableName)
-            .keySchema(keySchema)
-            .attributeDefinitions(attributeDefinitions)
-            .billingMode(billingMode);
+        .keySchema(keySchema)
+        .attributeDefinitions(attributeDefinitions)
+        .billingMode(billingMode);
     dynamoDB.createTable(createTableRequestBuilder.build());
 
     LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to 
be active");
     try {
 
-      DynamoTableUtils.waitUntilActive(dynamoDB, tableName, 
dynamoDBLockConfiguration.getInt(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT),
 20 * 1000);
+      DynamoTableUtils.waitUntilActive(dynamoDB, tableName, 
dynamoDbBasedLockConfig.getInt(DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT), 20 * 
1000);
     } catch (DynamoTableUtils.TableNeverTransitionedToStateException e) {
       throw new HoodieLockException("Created dynamoDB table never transits to 
active", e);
     } catch (InterruptedException e) {
diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java 
b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
index 0e884a6797f..207c7aba9da 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -47,6 +47,8 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
     return new DynamoDbBasedLockConfig.Builder();
   }
 
+  public static final int MAX_PARTITION_KEY_SIZE_BYTE = 2048;
+
   // configs for DynamoDb based locks
   public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = 
LockConfiguration.LOCK_PREFIX + "dynamodb.";
 
@@ -140,7 +142,7 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
 
     public DynamoDbBasedLockConfig build() {
       lockConfig.setDefaults(DynamoDbBasedLockConfig.class.getName());
-      checkRequiredProps(lockConfig);
+      checkRequiredProps();
       return lockConfig;
     }
 
@@ -149,17 +151,14 @@ public class DynamoDbBasedLockConfig extends HoodieConfig 
{
       return this;
     }
 
-    private void checkRequiredProps(final DynamoDbBasedLockConfig config) {
+    private void checkRequiredProps() {
       String errorMsg = "Config key is not found: ";
       ValidationUtils.checkArgument(
-          config.contains(DYNAMODB_LOCK_TABLE_NAME.key()),
+          lockConfig.contains(DYNAMODB_LOCK_TABLE_NAME.key()),
           errorMsg + DYNAMODB_LOCK_TABLE_NAME.key());
       ValidationUtils.checkArgument(
-          config.contains(DYNAMODB_LOCK_REGION.key()),
+          lockConfig.contains(DYNAMODB_LOCK_REGION.key()),
           errorMsg + DYNAMODB_LOCK_REGION.key());
-      ValidationUtils.checkArgument(
-          config.contains(DYNAMODB_LOCK_PARTITION_KEY.key()),
-          errorMsg + DYNAMODB_LOCK_PARTITION_KEY.key());
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
index 10d8b068279..e28c2cf0b32 100644
--- 
a/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
+++ 
b/hudi-aws/src/test/java/org/apache/hudi/aws/transaction/integ/ITTestDynamoDBBasedLockProvider.java
@@ -18,14 +18,24 @@
 
 package org.apache.hudi.aws.transaction.integ;
 
+import 
org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider;
 import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
+import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProviderBase;
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.hash.HashID;
 import org.apache.hudi.config.DynamoDbBasedLockConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -37,6 +47,7 @@ import java.net.URI;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
 
@@ -47,62 +58,78 @@ import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_
 @Disabled("HUDI-7475 The tests do not work. Disabling them to unblock Azure 
CI")
 public class ITTestDynamoDBBasedLockProvider {
 
-  private static LockConfiguration lockConfiguration;
-  private static DynamoDbClient dynamoDb;
-
+  private static final LockConfiguration LOCK_CONFIGURATION;
+  private static final LockConfiguration 
IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH;
+  private static final LockConfiguration 
IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY;
+  private static final LockConfiguration 
IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME;
+  private static final LockConfiguration IMPLICIT_PART_KEY_LOCK_CONFIG;
   private static final String TABLE_NAME_PREFIX = "testDDBTable-";
   private static final String REGION = "us-east-2";
+  private static DynamoDbClient dynamoDb;
 
-  @BeforeAll
-  public static void setup() throws InterruptedException {
-    Properties properties = new Properties();
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
 BillingMode.PAY_PER_REQUEST.name());
-    // properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), 
TABLE_NAME_PREFIX);
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
 "testKey");
-    properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), 
REGION);
-    properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
 "0");
-    
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
 "0");
-    lockConfiguration = new LockConfiguration(properties);
-    dynamoDb = getDynamoClientWithLocalEndpoint();
-  }
+  static {
+    Properties dynamoDblpProps = new Properties();
+    Properties implicitPartKeyLpProps;
+    // Common properties shared by both lock providers.
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(),
 BillingMode.PAY_PER_REQUEST.name());
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(),
 Integer.toString(20 * 1000 * 5));
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), 
REGION);
+    dynamoDblpProps.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(),
 "0");
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(),
 "0");
 
-  @Test
-  public void testAcquireLock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    dynamoDbBasedLockProvider.unlock();
+    implicitPartKeyLpProps = new TypedProperties(dynamoDblpProps);
+    dynamoDblpProps = new TypedProperties(dynamoDblpProps);
+
+    // For the day-1 DynamoDbBasedLockProvider, it requires an optional 
partition key
+    
dynamoDblpProps.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
 "testKey");
+    LOCK_CONFIGURATION = new LockConfiguration(dynamoDblpProps);
+
+    // For the newly added implicit partition key DDB lock provider, it can 
derive the partition key from
+    // hudi table base path and hudi table name. These properties are 
available in lockConfig as of today.
+    implicitPartKeyLpProps.setProperty(
+        HoodieCommonConfig.BASE_PATH.key(),
+        "gs://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
+    implicitPartKeyLpProps.setProperty(
+        HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "ma_po_tofu_is_awesome");
+    IMPLICIT_PART_KEY_LOCK_CONFIG = new 
LockConfiguration(implicitPartKeyLpProps);
+    // With partition key nothing should break, the field should be simply 
ignored.
+    TypedProperties withPartKey = new TypedProperties(implicitPartKeyLpProps);
+    
withPartKey.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(),
 "testKey");
+    IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY = new 
LockConfiguration(withPartKey);
+
+    // Missing either base path or hoodie table name is a bad config for 
implicit partition key lock provider.
+    TypedProperties missingBasePath = new 
TypedProperties(implicitPartKeyLpProps);
+    missingBasePath.remove(HoodieCommonConfig.BASE_PATH.key());
+    IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH = new 
LockConfiguration(missingBasePath);
+
+    TypedProperties missingTableName = new 
TypedProperties(implicitPartKeyLpProps);
+    missingTableName.remove(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+    IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME = new 
LockConfiguration(missingTableName);
   }
 
-  @Test
-  public void testUnlock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    dynamoDbBasedLockProvider.unlock();
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
+  @BeforeAll
+  public static void setup() throws InterruptedException {
+    dynamoDb = getDynamoClientWithLocalEndpoint();
   }
 
-  @Test
-  public void testReentrantLock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    
Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
-            .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), 
TimeUnit.MILLISECONDS));
-    dynamoDbBasedLockProvider.unlock();
+  public static Stream<Object> testDimensions() {
+    return Stream.of(
+        // Without parititon key, only table name is used.
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, 
DynamoDBBasedLockProvider.class),
+        Arguments.of(LOCK_CONFIGURATION, DynamoDBBasedLockProvider.class),
+        // Even if we have partition key set, nothing would break.
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_WITH_PART_KEY, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
+    );
   }
 
-  @Test
-  public void testUnlockWithoutLock() {
-    
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
-    DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new 
DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
-    dynamoDbBasedLockProvider.unlock();
+  public static Stream<Object> badTestDimensions() {
+    return Stream.of(
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME, 
DynamoDBBasedLockProvider.class),
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_TBL_NAME, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class),
+        Arguments.of(IMPLICIT_PART_KEY_LOCK_CONFIG_NO_BASE_PATH, 
DynamoDBBasedImplicitPartitionKeyLockProvider.class)
+    );
   }
 
   private static DynamoDbClient getDynamoClientWithLocalEndpoint() {
@@ -111,13 +138,95 @@ public class ITTestDynamoDBBasedLockProvider {
       throw new IllegalStateException("dynamodb-local.endpoint system property 
not set");
     }
     return DynamoDbClient.builder()
-            .region(Region.of(REGION))
-            .endpointOverride(URI.create(endpoint))
-            .credentialsProvider(getCredentials())
-            .build();
+        .region(Region.of(REGION))
+        .endpointOverride(URI.create(endpoint))
+        .credentialsProvider(getCredentials())
+        .build();
   }
 
   private static AwsCredentialsProvider getCredentials() {
     return 
StaticCredentialsProvider.create(AwsBasicCredentials.create("random-access-key",
 "random-secret-key"));
   }
+
+  @ParameterizedTest
+  @MethodSource("badTestDimensions")
+  void testBadConfig(LockConfiguration lockConfig, Class<?> lockProviderClass) 
{
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    Exception e = new Exception();
+    try {
+      ReflectionUtils.loadClass(
+          lockProviderClass.getName(),
+          new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+          lockConfig, null, dynamoDb);
+    } catch (Exception ex) {
+      e = ex;
+    }
+    Assertions.assertEquals(IllegalArgumentException.class, 
e.getCause().getCause().getClass());
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testAcquireLock(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+
+    // Also validate the partition key is properly constructed.
+    if (lockProviderClass.equals(DynamoDBBasedLockProvider.class)) {
+      String tableName = (String) 
lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+      String partitionKey = (String) 
lockConfig.getConfig().get(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key());
+      if (partitionKey != null) {
+        Assertions.assertEquals(partitionKey, 
dynamoDbBasedLockProvider.getPartitionKey());
+      } else {
+        Assertions.assertEquals(tableName, 
dynamoDbBasedLockProvider.getPartitionKey());
+      }
+    } else if 
(lockProviderClass.equals(DynamoDBBasedImplicitPartitionKeyLockProvider.class)) 
{
+      String tableName = (String) 
lockConfig.getConfig().get(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
+      String basePath = (String) 
lockConfig.getConfig().get(HoodieCommonConfig.BASE_PATH.key());
+      Assertions.assertEquals(
+          tableName + '-' + HashID.generateXXHashAsString(basePath, 
HashID.Size.BITS_64),
+          dynamoDbBasedLockProvider.getPartitionKey());
+    }
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    dynamoDbBasedLockProvider.unlock();
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testUnlock(LockConfiguration lockConfig, Class<?> lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    dynamoDbBasedLockProvider.unlock();
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testReentrantLock(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+    
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    
Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfig.getConfig().getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY),
 TimeUnit.MILLISECONDS));
+    dynamoDbBasedLockProvider.unlock();
+  }
+
+  @ParameterizedTest
+  @MethodSource("testDimensions")
+  void testUnlockWithoutLock(LockConfiguration lockConfig, Class<?> 
lockProviderClass) {
+    
lockConfig.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(),
 TABLE_NAME_PREFIX + UUID.randomUUID());
+    DynamoDBBasedLockProviderBase dynamoDbBasedLockProvider = 
(DynamoDBBasedLockProviderBase) ReflectionUtils.loadClass(
+        lockProviderClass.getName(),
+        new Class<?>[] {LockConfiguration.class, StorageConfiguration.class, 
DynamoDbClient.class},
+        new Object[] {lockConfig, null, dynamoDb});
+    dynamoDbBasedLockProvider.unlock();
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
index 4df8c385289..e76c5a8d073 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
@@ -79,10 +79,10 @@ public class HashID implements Serializable {
   }
 
   /**
-   * Get the hash value for a string message and for the desired @{@link Size}.
+   * Get the hash value for a string message and for the desired @{@link 
HashID.Size}.
    *
    * @param message - String message to get the hash value for
-   * @param bits    - @{@link Size} of the hash value
+   * @param bits    - @{@link HashID.Size} of the hash value
    * @return Hash value for the message as byte array
    */
   public static byte[] hash(final String message, final Size bits) {
@@ -108,6 +108,24 @@ public class HashID implements Serializable {
     }
   }
 
+  /**
+   * Get the hash value as string for a given string and for the desired 
@{@link Size}.
+   *
+   * @param input - String message to get the hash value for.
+   * @param size  - @{@link Size} of the hash value
+   * @return Hash value for the message as string.
+   */
+  public static String generateXXHashAsString(String input, Size size) {
+    // Compute the hash
+    byte[] hashBytes = hash(input, size);
+    // Convert the hash value to a hexadecimal string
+    StringBuilder hexString = new StringBuilder();
+    for (byte hashByte : hashBytes) {
+      hexString.append(String.format("%02X", hashByte));
+    }
+    return hexString.toString();
+  }
+
   public static int getXXHash32(final String message, int hashSeed) {
     return getXXHash32(getUTF8Bytes(message), hashSeed);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
index 1ab9d82b2b9..319e216ff4f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/hash/TestHashID.java
@@ -21,24 +21,40 @@ package org.apache.hudi.common.util.hash;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.xml.bind.DatatypeConverter;
 
-import java.util.Arrays;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 
+import static java.util.Arrays.asList;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests {@link HashID}.
  */
 public class TestHashID {
 
+  public static List<Arguments> hashIdSizes() {
+    return asList(
+        Arguments.of(HashID.Size.BITS_32),
+        Arguments.of(HashID.Size.BITS_64),
+        Arguments.of(HashID.Size.BITS_128)
+    );
+  }
+
   /**
    * Test HashID of all sizes for ByteArray type input message.
    */
@@ -124,8 +140,26 @@ public class TestHashID {
       for (Map.Entry<String, String> sizeEntry : 
allSizeEntries.getValue().entrySet()) {
         final byte[] actualHashBytes = HashID.hash(sizeEntry.getKey(), 
allSizeEntries.getKey());
         final byte[] expectedHashBytes = 
DatatypeConverter.parseHexBinary(sizeEntry.getValue());
-        assertTrue(Arrays.equals(expectedHashBytes, actualHashBytes));
+        assertArrayEquals(expectedHashBytes, actualHashBytes);
       }
     }
   }
+
+  @ParameterizedTest
+  @MethodSource("hashIdSizes")
+  void testGenerateXXHashMagicNumber(HashID.Size size) throws IOException, 
URISyntaxException {
+    // We need to make sure we always generate the same hash value for the 
same input. This test
+    // guards against unexpected change of hash value due to accidents like 
library version upgrade.
+    // Load inputs and expected hash values from files.
+    List<String> inputs = Files.readAllLines(Paths.get(Objects.requireNonNull(
+        
getClass().getClassLoader().getResource("hash/magic_input.txt")).toURI()));
+    List<String> expectedHash = 
Files.readAllLines(Paths.get(Objects.requireNonNull(
+        
getClass().getClassLoader().getResource(String.format("hash/xxhash_%s_for_magic_input.txt",
 size.name()))).toURI()));
+
+    for (int i = 0; i < expectedHash.size(); ++i) {
+      String hash = HashID.generateXXHashAsString(inputs.get(i), size);
+      // Magic number test to guard against accidental upgrade that changes 
the hash value
+      assertEquals(expectedHash.get(i), hash);
+    }
+  }
 }
diff --git a/hudi-common/src/test/resources/hash/magic_input.txt 
b/hudi-common/src/test/resources/hash/magic_input.txt
new file mode 100644
index 00000000000..ef6468b087a
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/magic_input.txt
@@ -0,0 +1,10 @@
+vB1xYjlEfL8tN9Ap6NMCwraFQlpQ6QU2z9XCPCv9lfIxzRfs6GfxKir2FuU:Xbl9uMZqjNRhKLqZY2MIH5vPCKZbV1zjoHVaVBnyRfzJLZ0jwWZQjJqjpdg/PsrIq3YdyAsDHcb__oXfbZUpMFX00g6ALt/uRZ2LOFFdQA9Ofuu_BLhhPl4gIfGNJIt1ATuCGIFXFG4zg1qn96MsLy8Nr2v/6pxzBXM5Rhzn6o_HhiFo1RzooJcm1tJlObZroxBNKSDbDcdYkm4KnAVUAVcyVfO2DRx_KQfphLYbG1rNRTV03RNN7jQakMf8Vq38Q1Joz:STilSqalcvrDmdPTY1RETh8eVP0ELqsKevQ22OKKl5ypy4I2Y6nqRfTfoepo_l0zc_QEZNawdif5l:SpvaJUaWU9rpUpqxiz2T_40ekLVXDM_B9oGgS4e9AMT50OASHp/cVue2qFpgrYSptp4EZZ9yfEqPXX3Ij7hSUGL29zar
 [...]
+EXDDWHY_7:5oYXsoCWM1LVD2mxXAilEXwsA8J_btuAAPUm6ODVlM/njpnAyULYvBKrAiwjY4xMWkJLCGS09qqmvu8ea5:ib/WYg1nHAELdy8fiREkQ/QXIByXL1xJ40ZfPYiMzUlNoUMXJNVqK59bipG7NSOhEZ0KSAW9uxv77S4G:8B1ybZNgWbYx_f1/DqYgPD7PcHfr3c_NMYvHVOCEOB3oYZwU2p_/BRWfaY/aJfydpG_VivzSqYM7qCghR8LXEL8955iXjPVMpXHu03BdAqE/uXsrlvTm8sdzum/PNsrwHYH:2YCHRjJ7B9yawP2mYZmmCTA9FSAqhizLrs7JvqUQNfF0DW8Imy_ld__eTq0hM55H3QEIzDs:u45zd6Y/1hEeBMFEGmgXHJDOGfSt1boX_lO6q/M33U5n0LEv1o_4_tQeqnsqODq509y7rbgfOVg7whAtZ81QlXNQ3VITG0kadegE3EFcD6OwC1I3BE
 [...]
+qbl3zptxyhBdGZzXYSuz/dc_W79I3nnZIg41:_ujQnA8NnzrsENtSvxUX5Hj5nHyjmlzS9vH2zawvXaoeTtwXPqEVp0Vc2p7Ho93iRUq:Zj/bjAxIuxBXEJMPYTE5PBVPSR2yPP4ci0ChrVtIRCXTD7v9gluqs4ouz_G8sNuXaT8uCP7ypOMRUG:VQuQ69yY/KIS1_AffBHL8n7jgT7nGfzra5dZZUBe4oqm9BI3nyHpCa5ookke:703QP:8sDMg3UdCg8uw6ccqAcco7xBbOgdvS6qpz1LZdku0:As2LOmX42B0aEReh52BdtZtBFnk5t_cm0m5ZAuEH_Bii8:reXocImMCSsnL2hVpbs8gn8jI4QAGW2rDOvj1N/byJbjrJxhlsZ6Y9dDZc7Z8B8aCx8kRoRQVkskAO4lULO1pNaIINreWaij:tJhHeF_kkhSevooTRkGXoWouI/1a4ADi5ijTESgsrBleqpU2e4yGmBKw
 [...]
+kha:j60RU2k4jsMcyoK/gJ7F4IEkoWzrtVN3kUEPvP/9M4NfaoPtruIe0reUxlMhoTKJbLTLFJ0pIVfqGrx/VAcW1ZWdjret19cb7TAP08wpOP_t5ljP/pUYR5KsEn5zUNtKWiNHJxPsT7u9FYNupiIRIe_VGG_J:1wPQJ2xBtWCbiCOGk15zGoOHhAjTE8V5JwyyJHrxSfZQpHXMqOYICII9/rXoKpTsV/uUASF58e/OuQ9pg0IA4KENs3_XnptctzloUYr0AI4zTLIRL5N:CvFRujqaOmKgiMGYmC1V9DdYkUTAiv_3NYvUnJuV54mXMEQlAoyrILPimonjdpjLx7fjep9vPjImfsyIXK3EoStTNJoWcAWsOzSn:wb84sBDg4mfDYvqz3pL13SFbIxtlsAl66/11xBVIUI0TRK6KWX5yX806JnbFSG:oi6X4_OdwucUDSx:ac_RqzZiA9Ig:wFg2dg6JYrVt3iLj/KDaS0
 [...]
+XViqCa5XfqAB0kski4A4N82FE8i0U/WjPvcgLbi:X9LhXRV0qfusVp19Xr_5pCQUDFRr34z_foGnpbWC5:pOn:jQHwVJIer:hfh5uErwLimX6OC1nLPXdPVrhKJ4/XN2YBfF5luB_7:dEGJ1E22iCTgto6sQW63f/c:wUtZdRioJ3JiA14KMVRKI8KD79zrCF1p61XSjQUvzrBh4VOjxGervgu2kZarv4HvvzvTuzThk4yrE1YvQse5iKeZWHbjCYBS7Tr0eTAjZLIqZRZ4/t/zuI9ZgnIerWJ8z3kfYv:maAiYP:Fjc4k32CXdXVhfiP9OLMqu4XWA2U1i7htnAotjF3G1ECwGLo9faRUxp5XwK85DhoQqrld/9e4BX4FxI61omgU_sa9SlSI:oU4/GrmsejIJn8adPcxwUyYf5uetKOVbcuBesp9l7pVmc4O97sojQ6:ohnhcvGssks0UpzdUideM8Hfo0yIWF1XPSJjJV
 [...]
+uYeJoV7jMpUbSfo0PZfVqxapg8VlFDxp7t6x3M:r/hMj3EdSrXNmtnAJOg/DubMz6uGvkZM3equxs8LflyyjZYFypahe90QXYM6mngjaMpXGl/ngy6_wQ512E7tqMnaVaz52xbKAdCCgastCJ5xM2jikxmIOK/28J77VXwkL6T43JGKXrFzoqm/eelXf:G5oihZt/jBKTMg/wXgC0:BvTVM9A1e4nmXw_ANjnBxCWe7QD7O/A/k4hrVt0MKpH2glsJKAnz_ZAY_qZp33vN5v975ZFwhAJtkYwtsXl4UFT_06Uy9BHyWnMcvHaPgEgslv/_P2FLzlJWfsSI8UjWUdMVhYAff4NE7UQF2Oye6ciXsW1ga_TS0VN8tugmqE:xs:5/SlyG3puDh:mzA60V7EpM38_rsKO/v9l1ZG9ICucglBv0hzHuVz/_vUR_5B_scT4nEIFmLOB67RsoGXMDVI7oJzETHzA3DLk:nX1yF4g_VT
 [...]
+1/dFjdWmwjcDBD6QBMjRqWtTJEdCZ2:2X2:NiPutlDhlxkp7k2MBqiAMDffvA5bbe0Yxel0IF94is_h0cMtgXKwYgCPKgFQgH0O2hnFrWp2fmETy4DCHnUV54mNUF4_xMESiZwJaUQFCPUxK8j28qlPppbt2M:zsJ64LUVIoCZpJsc1GRVY9lImikNRoEiOXKOQIMLOHlDfVEYyJubQy14zWQsqSoHK6Du_S76_MdZi3DEHJmInbDHvykJRoKke2QQL7wae/7D4FVg/zKbfnOSrgDTWbdDewy2lJKMti5MnrMnevTCQSSFhueoekPYsLiyoDIDWDoqAzdiWP2vkzisR1CbZ74DQeOyyk4rvF85TRTfjPWjdv6PZX40W0K2VOPWUiys5224nSnfT886LVBkuKwjImo8NqZskfp1LYJtMRYlad9/fRLVwo65h:l4FFLEuPH1OzlFaz6wsMqqh4GGtQeScwfzTWA78A9iosPNyn
 [...]
+twVaFB_AiBxWsfHRYHdhnwMtrtp/PzyU6lJivvSGGRfGmLUu0EcaW5dzO0vx/Hy/E33JY:2oeb1p:Hl35zPFj:98DQRRyodl5JJrBiNeyavBKlQPoYglYIpWK3uy47lsTHXe2SYrEEscKtwH/:BJiQn4XPQejfVBPNPI35Othp64j0tf2GBiqrAETQT0xnSrN6vnQnNWAj:RDR9IbUSZISaJ/GrQoT651W7iwQQvBo5L/XcdCrvWGbmw/3Yp5S2HeoA/:qJ:6gmrOQ9eoFGCe8X9yERKKbPzcmSB7SXKuP3UUbT1L0_vvFOP2mIsr46k:77G31gPJv6zzdDZLR1Ewxaz38vR6MeSjTtKR53yw44cWgWPD2wS/GHJDI4PhHxnsgUamNi8_PVmdFceTXouehmnrdrM_byV_iQOF:3Jq7czDhcAwMF2eYqZkdlPdG:kr:WQoIBu2UTgZB/uKI8kf7l4qosy:aTz_QLmouh5ilZ1
 [...]
+wM51f5wQY1p4N_J4hC/oN9BryO/ZUwUisXuYaGHIJGDZmTE52hPf7vJdPTR:NiJiPCNMG:cnWGoJwNRiKIQY3u/Yppx6Ig6wdvScLqPhXI5Fxq/W4XUgBkCwEVkhfd3FCv0/LmaNvg6RJelbz:/ympzvRLOpAgsYmxtnqh17rEqCkxKp4nC4o_j7VLJgyHqXYP9RnKw2sTpMP7seq1zHe00drKb_k:_iS59NZ2NnAJBjjKCo2kbNmT4LvAVa9COquUyCF4Q:Qp39Rxs30mNtTW30x_0LuL8TH4pg65Z0OjBoQtwo4:GBF_2rE2WbkRDeNeGGB:DUU4p0MqkkYTT7jraK10pmKi2C:wH6HXN8SX6UwCJoY2m24sIaU3f:k2vBzqYu4Mq5E9slvsx:vNjlA4KtSiy5XLt01s0bRuXhXXrjQuSsFXvGUdlppWtQcR8Xbg6W0Bk5Qdx267BBbsQ5bk6JOg8xHfGqcy2Kde_u_ZFz
 [...]
+AXipEHIS1WnAkrLS_TZT8AEBe2KPR:0tgOLRhUIEiN33Ml4_pOtN6TuDNvfSa4MSPexRluckSpCnl6le__MucXEilapxfqXY/MW3ebuICROpXQr9rn4fvJxxR25eeyDDOJfVkXu9gTwPJD:CAqb:byah1PXndGFR3L4NNgzofPKVvyh/liGhvcU54qF3:U8B4Ed4K5VVZKlDC_LIjyPpO:fzwhCY:a7plQHp3G8rwqZlVWll8gZt1BRTJS3nvxCLNe2IPjq5aj5QD/P3O4QkLPDW/bdXTFxfILaQ3xrBO:kqPMyqowiLcm6gAwY5fdkPgvv6xh7yC5t8d98io1UxEOoVdWdGUXgztIKs2gxjzVUqj6qsbLaIg2ft/j4txWJMQ3z3anRSmCZ1Th6V7LFEKaX2PBlK//eC0Sm1f45XPf5MWpHuRMxBzxFqY60IXZVJ1tx/Daa86UC5A0jyyJyPxmdUlt1FTqDqZ_TQK4qTWGrz
 [...]
diff --git 
a/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt 
b/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt
new file mode 100644
index 00000000000..ee2a4f5fbea
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_128_for_magic_input.txt
@@ -0,0 +1,10 @@
+AF26454CFF3D72BBDB7A6CD6BAF23770
+080D42690A45825782F4CD4DB31AA449
+E14D364EF059C966A7F50708E9CF2E7B
+68F4BCB6F281DD101D5B7375652E574E
+1F2A77EB57C4091E9F89887DA88169B2
+1A57459BE2F7ECD655C090BADAA3ED1A
+E79D2F90D440296F94419C7093518DE4
+86C4CFD114B59C0CB29AF95425767B63
+EE70F0B8607E339D92E6823DA2538113
+0E6F75EC00B024201B91FBCEBDB9F971
\ No newline at end of file
diff --git 
a/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt 
b/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt
new file mode 100644
index 00000000000..4cba48b9f4e
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_32_for_magic_input.txt
@@ -0,0 +1,10 @@
+0304CA31
+71EC1ABE
+4F003C35
+79A69D14
+62CAF229
+0136BCE2
+1D68C842
+0B416E0E
+CF7373EB
+F5933F12
\ No newline at end of file
diff --git 
a/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt 
b/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt
new file mode 100644
index 00000000000..bc06cc5614e
--- /dev/null
+++ b/hudi-common/src/test/resources/hash/xxhash_BITS_64_for_magic_input.txt
@@ -0,0 +1,10 @@
+F0E17DF624BF7B6A
+07EC6A7186973385
+24A8A6F1FEA11BA2
+BFE00C8A367BAF03
+C323770B790400C8
+9F96033C96E80D39
+5B99015437BBEA94
+C6C54FCA5141979D
+3580238C2761459E
+D9DBB1D9BF7174BC
\ No newline at end of file
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
index ded0d876eef..01e7426a6a6 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -327,4 +327,58 @@ public class StringUtils {
     }
     return str.substring(0, end);
   }
+
+  /**
+   * Concatenates two strings such that the total byte length does not exceed 
the threshold.
+   * If the total byte length exceeds the threshold, the function will find 
the maximum length of the first string
+   * that fits within the threshold and concatenate that with the second 
string.
+   *
+   * @param a         The first string
+   * @param b         The second string
+   * @param threshold The maximum byte length
+   */
+  public static String concatenateWithThreshold(String a, String b, int 
threshold) {
+    // Convert both strings to byte arrays in UTF-8 encoding
+    byte[] bytesA = getUTF8Bytes(a);
+    byte[] bytesB = getUTF8Bytes(b);
+    if (bytesB.length > threshold) {
+      throw new IllegalArgumentException(String.format(
+          "Length of the Second string to concatenate exceeds the threshold 
(%d > %d)",
+          bytesB.length, threshold));
+    }
+
+    // Calculate total bytes
+    int totalBytes = bytesA.length + bytesB.length;
+
+    // If total bytes is within the threshold, return concatenated string
+    if (totalBytes <= threshold) {
+      return a + b;
+    }
+
+    // Calculate the maximum bytes 'a' can take
+    int bestLength = getBestLength(a, threshold - bytesB.length);
+
+    // Concatenate the valid substring of 'a' with 'b'
+    return a.substring(0, bestLength) + b;
+  }
+
+  private static int getBestLength(String a, int threshold) {
+    // Binary search to find the maximum length of substring of 'a' that fits 
within maxBytesForA
+    int low = 0;
+    int high = Math.min(a.length(), threshold);
+    int bestLength = 0;
+
+    while (low <= high) {
+      int mid = (low + high) / 2;
+      byte[] subABytes = getUTF8Bytes(a.substring(0, mid));
+
+      if (subABytes.length <= threshold) {
+        bestLength = mid;
+        low = mid + 1;
+      } else {
+        high = mid - 1;
+      }
+    }
+    return bestLength;
+  }
 }
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java 
b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index 7cbf065b880..049601e3517 100644
--- a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -22,14 +22,19 @@ package org.apache.hudi.common.util;
 import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Random;
 
+import static org.apache.hudi.common.util.StringUtils.concatenateWithThreshold;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -39,6 +44,29 @@ public class TestStringUtils {
 
   private static final String[] STRINGS = {"This", "is", "a", "test"};
 
+  private static final String CHARACTERS_FOR_RANDOM_GEN = 
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_/:";
+  private static final Random RANDOM = new SecureRandom();
+
+  private static String toHexString(byte[] bytes) {
+    StringBuilder sb = new StringBuilder(bytes.length * 2);
+    for (byte b : bytes) {
+      sb.append(String.format("%02x", b));
+    }
+    return sb.toString();
+  }
+
+  private static String generateRandomString(int length) {
+    if (length < 1) {
+      throw new IllegalArgumentException("Length must be greater than 0");
+    }
+    StringBuilder builder = new StringBuilder(length);
+    for (int i = 0; i < length; i++) {
+      int randomIndex = RANDOM.nextInt(CHARACTERS_FOR_RANDOM_GEN.length());
+      builder.append(CHARACTERS_FOR_RANDOM_GEN.charAt(randomIndex));
+    }
+    return new String(getUTF8Bytes(builder.toString()), 
StandardCharsets.UTF_8);
+  }
+
   @Test
   public void testStringJoinWithDelim() {
     String joinedString = StringUtils.joinUsingDelim("-", STRINGS);
@@ -108,14 +136,6 @@ public class TestStringUtils {
     assertEquals(StringUtils.toHexString(getUTF8Bytes(str)), 
toHexString(getUTF8Bytes(str)));
   }
 
-  private static String toHexString(byte[] bytes) {
-    StringBuilder sb = new StringBuilder(bytes.length * 2);
-    for (byte b : bytes) {
-      sb.append(String.format("%02x", b));
-    }
-    return sb.toString();
-  }
-
   @Test
   public void testTruncate() {
     assertNull(StringUtils.truncate(null, 10, 10));
@@ -129,7 +149,61 @@ public class TestStringUtils {
     assertTrue(StringUtils.compareVersions("1.9", "1.10") < 0);
     assertTrue(StringUtils.compareVersions("1.100.1", "1.10") > 0);
     assertTrue(StringUtils.compareVersions("1.10.1", "1.10") > 0);
-    assertTrue(StringUtils.compareVersions("1.10", "1.10") == 0);
+    assertEquals(0, StringUtils.compareVersions("1.10", "1.10"));
+  }
+
+  @Test
+  void testConcatenateWithinThreshold() {
+    String a = generateRandomString(1000); // 1000 bytes in UTF-8
+    String b = generateRandomString(1048); // 1048 bytes in UTF-8
+    int threshold = 2048;
+
+    // The total length of bytes of `a` + `b` exceeds the threshold
+    String result = StringUtils.concatenateWithThreshold(a, b, threshold);
+
+    // The resulting string should be exactly `threshold` bytes long
+    assertEquals(threshold, getUTF8Bytes(result).length);
+    assertEquals(a + b, result);
+
+    // Test case when a + b is within the threshold
+    String a2 = generateRandomString(900);
+    String b2 = generateRandomString(1000);
+    String result2 = concatenateWithThreshold(a2, b2, threshold);
+
+    // The resulting string should be `a2 + b2`
+    assertEquals(a2 + b2, result2);
+  }
+
+  @Test
+  void testConcatenateInvalidInput() {
+    // Test case when b alone exceeds the threshold
+    String a = generateRandomString(900);
+    String b = generateRandomString(3000); // 3000 bytes in UTF-8
+    Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+      concatenateWithThreshold(a, b, 2048);
+    });
+
+    String expectedMessage = "Length of the Second string to concatenate 
exceeds the threshold (3000 > 2048)";
+    String actualMessage = exception.getMessage();
+
+    assertTrue(actualMessage.contains(expectedMessage));
+  }
+
+  @Test
+  void testConcatenateTruncateCase() {
+    // 'é' is 2 bytes
+    assertEquals("ad", concatenateWithThreshold("aé", "d", 3));
+    // Chinese chars are 3 bytes
+    assertEquals("世d", concatenateWithThreshold("世界", "d", 4));
+    assertEquals("ad", concatenateWithThreshold("ab", "d", 2));
+  }
+
+  @Test
+  void testGenerateInvalidRandomString() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> generateRandomString(-1)
+    );
   }
 
   @Test
diff --git a/scripts/release/validate_source_copyright.sh 
b/scripts/release/validate_source_copyright.sh
index d44864135be..deb79666269 100755
--- a/scripts/release/validate_source_copyright.sh
+++ b/scripts/release/validate_source_copyright.sh
@@ -46,10 +46,10 @@ echo -e "\t\tNotice file exists ? [OK]\n"
 
 ### Licensing Check
 echo "Performing custom Licensing Check "
-numfilesWithNoLicense=`find . -iname '*' -type f | grep -v NOTICE | grep -v 
LICENSE | grep -v '.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' 
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v KEYS | 
grep -v '.mailmap' | grep -v '.sqltemplate' | grep -v 'banner.txt' | grep -v 
"fixtures" | xargs grep -L "Licensed to the Apache Software Foundation (ASF)" | 
wc -l`
+numfilesWithNoLicense=`find . -iname '*' -type f | grep -v NOTICE | grep -v 
LICENSE | grep -v '.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' 
| grep -v '.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v KEYS | 
grep -v '.mailmap' | grep -v '.sqltemplate' | grep -v 'banner.txt' | grep -v 
'.txt' | grep -v "fixtures" | xargs grep -L "Licensed to the Apache Software 
Foundation (ASF)" | wc -l`
 if [ "$numfilesWithNoLicense" -gt  "0" ]; then
   echo "There were some source files that did not have Apache License [ERROR]"
-  find . -iname '*' -type f | grep -v NOTICE | grep -v LICENSE | grep -v 
'.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' | grep -v 
'.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v '.sqltemplate' | 
grep -v KEYS | grep -v '.mailmap' | grep -v 'banner.txt' | grep -v "fixtures" | 
xargs grep -L "Licensed to the Apache Software Foundation (ASF)"
+  find . -iname '*' -type f | grep -v NOTICE | grep -v LICENSE | grep -v 
'.jpg' | grep -v '.json' | grep -v '.hfile' | grep -v '.data' | grep -v 
'.commit' | grep -v emptyFile | grep -v DISCLAIMER | grep -v '.sqltemplate' | 
grep -v KEYS | grep -v '.mailmap' | grep -v 'banner.txt' | grep -v '.txt' | 
grep -v "fixtures" | xargs grep -L "Licensed to the Apache Software Foundation 
(ASF)"
   exit 1
 fi
 echo -e "\t\tLicensing Check Passed [OK]\n"

Reply via email to