This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fbf5c52f19 [HUDI-7601] Add heartbeat mechanism to refresh lock 
(#10994)
4fbf5c52f19 is described below

commit 4fbf5c52f19b1e3192b09f9362e35dd22c3a0da6
Author: Yann Byron <biyan900...@gmail.com>
AuthorDate: Fri Apr 12 14:12:04 2024 +0800

    [HUDI-7601] Add heartbeat mechanism to refresh lock (#10994)
    
    * [HUDI-7601] Add heartbeat mechanism to refresh lock
---
 .../org/apache/hudi/config/HoodieLockConfig.java   | 13 +++++++
 .../hudi/common/config/LockConfiguration.java      |  3 ++
 .../hudi/hive/transaction/lock/Heartbeat.java      | 42 ++++++++++++++++++++++
 .../lock/HiveMetastoreBasedLockProvider.java       | 23 ++++++++++--
 4 files changed, 79 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index fa38da8f8ab..1c51b6db8b3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -37,6 +37,7 @@ import java.util.Properties;
 import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
 import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
 import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS;
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS;
 import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
 import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
 import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
@@ -50,6 +51,7 @@ import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_R
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_HEARTBEAT_INTERVAL_MS_KEY;
 import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY;
@@ -112,6 +114,12 @@ public class HoodieLockConfig extends HoodieConfig {
       .sinceVersion("0.8.0")
       .withDocumentation("Timeout in ms, to wait on an individual lock 
acquire() call, at the lock provider.");
 
+  public static final ConfigProperty<Integer> LOCK_HEARTBEAT_INTERVAL_MS = 
ConfigProperty
+      .key(LOCK_HEARTBEAT_INTERVAL_MS_KEY)
+      .defaultValue(DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS)
+      .sinceVersion("1.0.0")
+      .withDocumentation("Heartbeat interval in ms, to send a heartbeat to 
indicate that hive client holding locks.");
+
   public static final ConfigProperty<String> FILESYSTEM_LOCK_PATH = 
ConfigProperty
       .key(FILESYSTEM_LOCK_PATH_PROP_KEY)
       .noDefaultValue()
@@ -343,6 +351,11 @@ public class HoodieLockConfig extends HoodieConfig {
       return this;
     }
 
+    public HoodieLockConfig.Builder withHeartbeatIntervalInMillis(Long 
intervalInMillis) {
+      lockConfig.setValue(LOCK_HEARTBEAT_INTERVAL_MS, 
String.valueOf(intervalInMillis));
+      return this;
+    }
+
     public HoodieLockConfig.Builder 
withConflictResolutionStrategy(ConflictResolutionStrategy 
conflictResolutionStrategy) {
       lockConfig.setValue(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME, 
conflictResolutionStrategy.getClass().getName());
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
index 1171dcf3fce..9d79be37810 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java
@@ -45,6 +45,9 @@ public class LockConfiguration implements Serializable {
   public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY = 
LOCK_PREFIX + "wait_time_ms";
   public static final int DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS = 60 * 1000;
 
+  public static final String LOCK_HEARTBEAT_INTERVAL_MS_KEY = LOCK_PREFIX + 
"heartbeat_interval_ms";
+  public static final int DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS = 60 * 1000;
+
   // configs for file system based locks. NOTE: This only works for DFS with 
atomic create/delete operation
   public static final String FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX = 
LOCK_PREFIX + "filesystem.";
 
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java
new file mode 100644
index 00000000000..14398af2c74
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hive.transaction.lock;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hudi.exception.HoodieLockException;
+
+class Heartbeat implements Runnable {
+  private final IMetaStoreClient client;
+  private final long lockId;
+
+  Heartbeat(IMetaStoreClient client, long lockId) {
+    this.client = client;
+    this.lockId = lockId;
+  }
+
+  @Override
+  public void run() {
+    try {
+      client.heartbeat(0, lockId);
+    } catch (Exception e) {
+      throw new HoodieLockException(String.format("Failed to heartbeat for 
lock: %d", lockId));
+    }
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
index 0280621bb53..4c5aa5cb4f7 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java
@@ -44,16 +44,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static 
org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS;
 import static 
org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.LOCK_HEARTBEAT_INTERVAL_MS_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY;
 import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP_KEY;
 import static 
org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY;
@@ -81,7 +84,8 @@ public class HiveMetastoreBasedLockProvider implements 
LockProvider<LockResponse
   private IMetaStoreClient hiveClient;
   private volatile LockResponse lock = null;
   protected LockConfiguration lockConfiguration;
-  ExecutorService executor = Executors.newSingleThreadExecutor();
+  private ScheduledFuture<?> future = null;
+  private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(2);
 
   public HiveMetastoreBasedLockProvider(final LockConfiguration 
lockConfiguration, final Configuration conf) {
     this(lockConfiguration);
@@ -128,6 +132,9 @@ public class HiveMetastoreBasedLockProvider implements 
LockProvider<LockResponse
         return;
       }
       lock = null;
+      if (future != null) {
+        future.cancel(false);
+      }
       hiveClient.unlock(lockResponseLocal.getLockid());
       LOG.info(generateLogStatement(RELEASED, generateLogSuffixString()));
     } catch (TException e) {
@@ -153,6 +160,9 @@ public class HiveMetastoreBasedLockProvider implements 
LockProvider<LockResponse
         hiveClient.unlock(lock.getLockid());
         lock = null;
       }
+      if (future != null) {
+        future.cancel(false);
+      }
       Hive.closeCurrent();
       executor.shutdown();
     } catch (Exception e) {
@@ -188,6 +198,12 @@ public class HiveMetastoreBasedLockProvider implements 
LockProvider<LockResponse
       final LockRequest lockRequestFinal = lockRequest;
       this.lock = executor.submit(() -> hiveClient.lock(lockRequestFinal))
           .get(time, unit);
+
+      // refresh lock in case that certain commit takes a long time.
+      Heartbeat heartbeat = new Heartbeat(hiveClient, lock.getLockid());
+      long heartbeatIntervalMs = lockConfiguration.getConfig()
+          .getLong(LOCK_HEARTBEAT_INTERVAL_MS_KEY, 
DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS);
+      future = executor.scheduleAtFixedRate(heartbeat, heartbeatIntervalMs / 
2, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
     } catch (InterruptedException | TimeoutException e) {
       if (this.lock == null || this.lock.getState() != LockState.ACQUIRED) {
         LockResponse lockResponse = 
this.hiveClient.checkLock(lockRequest.getTxnid());
@@ -202,6 +218,9 @@ public class HiveMetastoreBasedLockProvider implements 
LockProvider<LockResponse
       if (this.lock != null && this.lock.getState() != LockState.ACQUIRED) {
         hiveClient.unlock(this.lock.getLockid());
         lock = null;
+        if (future != null) {
+          future.cancel(false);
+        }
       }
     }
   }

Reply via email to