This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 4fbf5c52f19 [HUDI-7601] Add heartbeat mechanism to refresh lock (#10994) 4fbf5c52f19 is described below commit 4fbf5c52f19b1e3192b09f9362e35dd22c3a0da6 Author: Yann Byron <biyan900...@gmail.com> AuthorDate: Fri Apr 12 14:12:04 2024 +0800 [HUDI-7601] Add heartbeat mechanism to refresh lock (#10994) * [HUDI-7601] Add heartbeat mechanism to refresh lock --- .../org/apache/hudi/config/HoodieLockConfig.java | 13 +++++++ .../hudi/common/config/LockConfiguration.java | 3 ++ .../hudi/hive/transaction/lock/Heartbeat.java | 42 ++++++++++++++++++++++ .../lock/HiveMetastoreBasedLockProvider.java | 23 ++++++++++-- 4 files changed, 79 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index fa38da8f8ab..1c51b6db8b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -37,6 +37,7 @@ import java.util.Properties; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS; +import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; @@ -50,6 +51,7 @@ import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_R import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_HEARTBEAT_INTERVAL_MS_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX; import static org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP_KEY; @@ -112,6 +114,12 @@ public class HoodieLockConfig extends HoodieConfig { .sinceVersion("0.8.0") .withDocumentation("Timeout in ms, to wait on an individual lock acquire() call, at the lock provider."); + public static final ConfigProperty<Integer> LOCK_HEARTBEAT_INTERVAL_MS = ConfigProperty + .key(LOCK_HEARTBEAT_INTERVAL_MS_KEY) + .defaultValue(DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS) + .sinceVersion("1.0.0") + .withDocumentation("Heartbeat interval in ms, to send a heartbeat to indicate that hive client holding locks."); + public static final ConfigProperty<String> FILESYSTEM_LOCK_PATH = ConfigProperty .key(FILESYSTEM_LOCK_PATH_PROP_KEY) .noDefaultValue() @@ -343,6 +351,11 @@ public class HoodieLockConfig extends HoodieConfig { return this; } + public HoodieLockConfig.Builder withHeartbeatIntervalInMillis(Long intervalInMillis) { + lockConfig.setValue(LOCK_HEARTBEAT_INTERVAL_MS, String.valueOf(intervalInMillis)); + return this; + } + public HoodieLockConfig.Builder withConflictResolutionStrategy(ConflictResolutionStrategy conflictResolutionStrategy) { lockConfig.setValue(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME, conflictResolutionStrategy.getClass().getName()); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java index 1171dcf3fce..9d79be37810 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java @@ -45,6 +45,9 @@ public class LockConfiguration implements Serializable { public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY = LOCK_PREFIX + "wait_time_ms"; public static final int DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS = 60 * 1000; + public static final String LOCK_HEARTBEAT_INTERVAL_MS_KEY = LOCK_PREFIX + "heartbeat_interval_ms"; + public static final int DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS = 60 * 1000; + // configs for file system based locks. NOTE: This only works for DFS with atomic create/delete operation public static final String FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "filesystem."; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java new file mode 100644 index 00000000000..14398af2c74 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/Heartbeat.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.hive.transaction.lock; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hudi.exception.HoodieLockException; + +class Heartbeat implements Runnable { + private final IMetaStoreClient client; + private final long lockId; + + Heartbeat(IMetaStoreClient client, long lockId) { + this.client = client; + this.lockId = lockId; + } + + @Override + public void run() { + try { + client.heartbeat(0, lockId); + } catch (Exception e) { + throw new HoodieLockException(String.format("Failed to heartbeat for lock: %d", lockId)); + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java index 0280621bb53..4c5aa5cb4f7 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/transaction/lock/HiveMetastoreBasedLockProvider.java @@ -44,16 +44,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS; import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_HEARTBEAT_INTERVAL_MS_KEY; import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP_KEY; @@ -81,7 +84,8 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse private IMetaStoreClient hiveClient; private volatile LockResponse lock = null; protected LockConfiguration lockConfiguration; - ExecutorService executor = Executors.newSingleThreadExecutor(); + private ScheduledFuture<?> future = null; + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { this(lockConfiguration); @@ -128,6 +132,9 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse return; } lock = null; + if (future != null) { + future.cancel(false); + } hiveClient.unlock(lockResponseLocal.getLockid()); LOG.info(generateLogStatement(RELEASED, generateLogSuffixString())); } catch (TException e) { @@ -153,6 +160,9 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse hiveClient.unlock(lock.getLockid()); lock = null; } + if (future != null) { + future.cancel(false); + } Hive.closeCurrent(); executor.shutdown(); } catch (Exception e) { @@ -188,6 +198,12 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse final LockRequest lockRequestFinal = lockRequest; this.lock = executor.submit(() -> hiveClient.lock(lockRequestFinal)) .get(time, unit); + + // refresh lock in case that certain commit takes a long time. + Heartbeat heartbeat = new Heartbeat(hiveClient, lock.getLockid()); + long heartbeatIntervalMs = lockConfiguration.getConfig() + .getLong(LOCK_HEARTBEAT_INTERVAL_MS_KEY, DEFAULT_LOCK_HEARTBEAT_INTERVAL_MS); + future = executor.scheduleAtFixedRate(heartbeat, heartbeatIntervalMs / 2, heartbeatIntervalMs, TimeUnit.MILLISECONDS); } catch (InterruptedException | TimeoutException e) { if (this.lock == null || this.lock.getState() != LockState.ACQUIRED) { LockResponse lockResponse = this.hiveClient.checkLock(lockRequest.getTxnid()); @@ -202,6 +218,9 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse if (this.lock != null && this.lock.getState() != LockState.ACQUIRED) { hiveClient.unlock(this.lock.getLockid()); lock = null; + if (future != null) { + future.cancel(false); + } } } }