rdblue commented on a change in pull request #1823:
URL: https://github.com/apache/iceberg/pull/1823#discussion_r552087542



##########
File path: aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.common.DynConstructors;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+
+class LockManagers {
+
+  private static final LockManager LOCK_MANAGER_DEFAULT = new 
InMemoryLockManager(Maps.newHashMap());
+
+  private LockManagers() {
+  }
+
+  public static LockManager defaultLockManager() {
+    return LOCK_MANAGER_DEFAULT;
+  }
+
+  public static LockManager from(Map<String, String> properties) {
+    if (properties.containsKey(CatalogProperties.LOCK_IMPL)) {
+      return loadLockManager(properties.get(CatalogProperties.LOCK_IMPL), 
properties);
+    } else {
+      return defaultLockManager();
+    }
+  }
+
+  private static LockManager loadLockManager(String impl, Map<String, String> 
properties) {
+    DynConstructors.Ctor<LockManager> ctor;
+    try {
+      ctor = 
DynConstructors.builder(LockManager.class).hiddenImpl(impl).buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize LockManager, missing no-arg constructor: %s", 
impl), e);
+    }
+
+    LockManager lockManager;
+    try {
+      lockManager = ctor.newInstance();
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize LockManager, %s does not implement 
LockManager.", impl), e);
+    }
+
+    lockManager.initialize(properties);
+    return lockManager;
+  }
+
+  abstract static class BaseLockManager implements LockManager {
+
+    private static volatile ScheduledExecutorService scheduler;
+
+    private long acquireTimeoutMs;
+    private long acquireIntervalMs;
+    private long heartbeatIntervalMs;
+    private long heartbeatTimeoutMs;
+    private int heartbeatThreads;
+
+    public long heartbeatTimeoutMs() {
+      return heartbeatTimeoutMs;
+    }
+
+    public long heartbeatIntervalMs() {
+      return heartbeatIntervalMs;
+    }
+
+    public long acquireIntervalMs() {
+      return acquireIntervalMs;
+    }
+
+    public long acquireTimeoutMs() {
+      return acquireTimeoutMs;
+    }
+
+    public int heartbeatThreads() {
+      return heartbeatThreads;
+    }
+
+    @SuppressWarnings("StaticGuardedByInstance")
+    public ScheduledExecutorService scheduler() {
+      if (scheduler == null) {
+        synchronized (this) {
+          if (scheduler == null) {
+            scheduler = Executors.newScheduledThreadPool(heartbeatThreads);
+          }
+        }
+      }
+      return scheduler;
+    }
+
+    @Override
+    public void initialize(Map<String, String> properties) {
+      this.acquireTimeoutMs = PropertyUtil.propertyAsLong(properties,
+          CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS, 
CatalogProperties.LOCK_ACQUIRE_TIMEOUT_MS_DEFAULT);
+      this.acquireIntervalMs = PropertyUtil.propertyAsLong(properties,
+          CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS, 
CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT);
+      this.heartbeatIntervalMs = PropertyUtil.propertyAsLong(properties,
+          CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS, 
CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);
+      this.heartbeatTimeoutMs = PropertyUtil.propertyAsLong(properties,
+          CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS, 
CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT);
+      this.heartbeatThreads = PropertyUtil.propertyAsInt(properties,
+          CatalogProperties.LOCK_HEARTBEAT_THREADS, 
CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT);
+    }
+  }
+
+  /**
+   * Implementation of {@link LockManager} that uses an in-memory concurrent 
map for locking.
+   * This implementation should only be used for testing,
+   * or if the caller only needs locking within the same JVM during table 
commits.
+   */
+  static class InMemoryLockManager extends BaseLockManager {
+
+    private static final Map<String, DefaultLockContent> LOCKS = 
Maps.newConcurrentMap();
+    private static final Map<String, ScheduledFuture<?>> HEARTBEATS = 
Maps.newHashMap();
+
+    InMemoryLockManager(Map<String, String> properties) {
+      initialize(properties);
+    }
+
+    @VisibleForTesting
+    void acquireOnce(String entityId, String ownerId) {
+      DefaultLockContent content = LOCKS.get(entityId);
+      if (content != null && content.expireMs() > System.currentTimeMillis()) {
+        throw new IllegalStateException(String.format("Lock for %s currently 
held by %s, expiration: %s",
+            entityId, content.ownerId(), content.expireMs()));
+      }
+
+      long expiration = System.currentTimeMillis() + heartbeatTimeoutMs();
+      boolean succeed;
+      if (content == null) {
+        DefaultLockContent previous = LOCKS.putIfAbsent(
+            entityId, new DefaultLockContent(ownerId, expiration));
+        succeed = previous == null;
+      } else {
+        succeed = LOCKS.replace(entityId, content, new 
DefaultLockContent(ownerId, expiration));
+      }
+
+      if (succeed) {
+        // cleanup old heartbeat
+        if (HEARTBEATS.containsKey(entityId)) {
+          HEARTBEATS.remove(entityId).cancel(false);
+        }
+
+        HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> {
+          DefaultLockContent lastContent = LOCKS.get(entityId);
+          try {
+            long newExpiration = System.currentTimeMillis() + 
heartbeatTimeoutMs();
+            LOCKS.replace(entityId, lastContent, new 
DefaultLockContent(ownerId, newExpiration));
+          } catch (NullPointerException e) {
+            throw new RuntimeException("Cannot heartbeat to a deleted lock " + 
entityId, e);
+          }
+
+        }, 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
+
+      } else {
+        throw new IllegalStateException("Unable to acquire lock " + entityId);
+      }
+    }
+
+    @Override
+    public boolean acquire(String entityId, String ownerId) {
+      try {
+        Tasks.foreach(entityId)
+            .retry(Integer.MAX_VALUE - 1)
+            .onlyRetryOn(IllegalStateException.class)
+            .throwFailureWhenFinished()
+            .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), 
acquireTimeoutMs(), 1)
+            .run(id -> acquireOnce(id, ownerId));
+        return true;
+      } catch (IllegalStateException e) {
+        return false;
+      }
+    }
+
+    @Override
+    public void release(String entityId, String ownerId) {
+      DefaultLockContent currentContent = LOCKS.get(entityId);
+      if (currentContent == null) {
+        throw new IllegalArgumentException("Cannot find lock for entity " + 
entityId);
+      }
+
+      if (!currentContent.ownerId().equals(ownerId)) {
+        throw new IllegalArgumentException(String.format(
+            "Cannot unlock %s by %s, current owner: %s", entityId, ownerId, 
currentContent.ownerId()));
+      }
+      HEARTBEATS.remove(entityId).cancel(false);
+      LOCKS.remove(entityId);
+    }
+
+    @Override
+    public void close() {

Review comment:
       I think this also needs to shut down the heartbeat thread pool. That 
should probably be done in the base class, so probably just call 
`super.close()` here and add the shutdown there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to