[GitHub] [hudi] nsivabalan commented on a diff in pull request #6071: [HUDI-4065] Add FileBasedLockProvider

2023-01-03 Thread GitBox


nsivabalan commented on code in PR #6071:
URL: https://github.com/apache/hudi/pull/6071#discussion_r1061068779


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java:
##
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+
+/**
+ * A FileSystem based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using DFS. Users might need to manually clean the Locker's path if 
writeClient crash and never run again.
+ * NOTE: This only works for DFS with atomic create/delete operation
+ */
+public class FileSystemBasedLockProvider implements LockProvider, 
Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(FileSystemBasedLockProvider.class);
+
+  private static final String LOCK_FILE_NAME = "lock";
+
+  private final int lockTimeoutMinutes;
+  private transient FileSystem fs;
+  private transient Path lockFile;
+  protected LockConfiguration lockConfiguration;
+
+  public FileSystemBasedLockProvider(final LockConfiguration 
lockConfiguration, final Configuration configuration) {
+checkRequiredProps(lockConfiguration);
+this.lockConfiguration = lockConfiguration;
+String lockDirectory = 
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null);
+if (StringUtils.isNullOrEmpty(lockDirectory)) {
+  lockDirectory = 
lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key())
++ Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
+}
+this.lockTimeoutMinutes = 
lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
+this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME);
+this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
+  }
+
+  @Override
+  public void close() {
+synchronized (LOCK_FILE_NAME) {
+  try {
+fs.delete(this.lockFile, true);
+  } catch (IOException e) {
+throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
+  }
+}
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+try {
+  synchronized (LOCK_FILE_NAME) {
+// Check whether lock is already expired, if so try to delete lock file
+if (fs.exists(this.lockFile) && checkIfExpired()) {
+  fs.delete(this.lockFile, true);
+}
+acquireLock();
+return fs.exists(this.lockFile);
+  }
+} catch (IOException | HoodieIOException e) {
+  LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+  return false;
+}
+  }
+
+  @Override
+  public void unlock() {
+synchronized (LOCK_FILE_NAME) {
+  try {
+if (fs.exists(this.lockFile)) {
+  fs.delete(this.lockFile, true);
+}
+  } catch (IOException io) {
+throw new 
HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
+  }
+}
+  }
+
+  @Override
+  public String getLock() {
+return this.lockFile.toString();
+  }
+
+  private boolean checkIfExpired() {
+if (lockTimeoutMinutes == 0) {
+ 

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6071: [HUDI-4065] Add FileBasedLockProvider

2023-01-03 Thread GitBox


nsivabalan commented on code in PR #6071:
URL: https://github.com/apache/hudi/pull/6071#discussion_r1061067820


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java:
##
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.transaction.lock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.LockConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.lock.LockProvider;
+import org.apache.hudi.common.lock.LockState;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
+import static 
org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
+
+/**
+ * A FileSystem based lock. This {@link LockProvider} implementation allows to 
lock table operations
+ * using DFS. Users might need to manually clean the Locker's path if 
writeClient crash and never run again.
+ * NOTE: This only works for DFS with atomic create/delete operation
+ */
+public class FileSystemBasedLockProvider implements LockProvider, 
Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(FileSystemBasedLockProvider.class);
+
+  private static final String LOCK_FILE_NAME = "lock";
+
+  private final int lockTimeoutMinutes;
+  private transient FileSystem fs;
+  private transient Path lockFile;
+  protected LockConfiguration lockConfiguration;
+
+  public FileSystemBasedLockProvider(final LockConfiguration 
lockConfiguration, final Configuration configuration) {
+checkRequiredProps(lockConfiguration);
+this.lockConfiguration = lockConfiguration;
+String lockDirectory = 
lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null);
+if (StringUtils.isNullOrEmpty(lockDirectory)) {
+  lockDirectory = 
lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key())
++ Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
+}
+this.lockTimeoutMinutes = 
lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
+this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME);
+this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
+  }
+
+  @Override
+  public void close() {
+synchronized (LOCK_FILE_NAME) {
+  try {
+fs.delete(this.lockFile, true);
+  } catch (IOException e) {
+throw new 
HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
+  }
+}
+  }
+
+  @Override
+  public boolean tryLock(long time, TimeUnit unit) {
+try {
+  synchronized (LOCK_FILE_NAME) {
+// Check whether lock is already expired, if so try to delete lock file
+if (fs.exists(this.lockFile) && checkIfExpired()) {
+  fs.delete(this.lockFile, true);
+}
+acquireLock();
+return fs.exists(this.lockFile);
+  }
+} catch (IOException | HoodieIOException e) {
+  LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
+  return false;
+}
+  }
+
+  @Override
+  public void unlock() {
+synchronized (LOCK_FILE_NAME) {
+  try {
+if (fs.exists(this.lockFile)) {
+  fs.delete(this.lockFile, true);
+}
+  } catch (IOException io) {
+throw new 
HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
+  }
+}
+  }
+
+  @Override
+  public String getLock() {
+return this.lockFile.toString();
+  }
+
+  private boolean checkIfExpired() {
+if (lockTimeoutMinutes == 0) {
+