This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9723618eb [core] Refactor catalog lock factory for paimon Factory 
(#2872)
9723618eb is described below

commit 9723618eb756a369a1c23ba452ca1858e1222be5
Author: Fang Yong <[email protected]>
AuthorDate: Wed Feb 21 15:16:06 2024 +0800

    [core] Refactor catalog lock factory for paimon Factory (#2872)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  3 +-
 .../java/org/apache/paimon/catalog/Catalog.java    |  7 +++-
 .../org/apache/paimon/catalog/CatalogLock.java     |  8 +++--
 .../apache/paimon/catalog/FileSystemCatalog.java   |  2 +-
 .../java/org/apache/paimon/operation/Lock.java     | 18 +++++++---
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 13 +++++---
 .../org/apache/paimon/hive/HiveCatalogLock.java    | 38 ++++++++++++++--------
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 10 +++---
 8 files changed, 66 insertions(+), 33 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 89fd52f6c..3c0b0063b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -304,7 +304,8 @@ public abstract class AbstractCatalog implements Catalog {
                 getDataTableLocation(identifier),
                 tableSchema,
                 new CatalogEnvironment(
-                        Lock.factory(lockFactory().orElse(null), identifier),
+                        Lock.factory(
+                                lockFactory().orElse(null), 
lockContext().orElse(null), identifier),
                         metastoreClientFactory(identifier).orElse(null),
                         lineageMetaFactory));
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 005f5b16a..0168891b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -49,7 +49,12 @@ public interface Catalog extends AutoCloseable {
      * Get lock factory from catalog. Lock is used to support multiple 
concurrent writes on the
      * object store.
      */
-    Optional<CatalogLock.Factory> lockFactory();
+    Optional<CatalogLock.LockFactory> lockFactory();
+
+    /** Get lock context for lock factory to create a lock. */
+    default Optional<CatalogLock.LockContext> lockContext() {
+        return Optional.empty();
+    }
 
     /** Get metastore client factory for the table specified by {@code 
identifier}. */
     default Optional<MetastoreClient.Factory> 
metastoreClientFactory(Identifier identifier) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
index 278b3ad63..0e547037e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.factories.Factory;
 
 import java.io.Closeable;
 import java.io.Serializable;
@@ -36,7 +37,10 @@ public interface CatalogLock extends Closeable {
     <T> T runWithLock(String database, String table, Callable<T> callable) 
throws Exception;
 
     /** Factory to create {@link CatalogLock}. */
-    interface Factory extends Serializable {
-        CatalogLock create();
+    interface LockFactory extends Factory, Serializable {
+        CatalogLock create(LockContext context);
     }
+
+    /** Context for lock factory to create lock. */
+    interface LockContext extends Serializable {}
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index e458dad7c..0c01e9cb7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -57,7 +57,7 @@ public class FileSystemCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Optional<CatalogLock.Factory> lockFactory() {
+    public Optional<CatalogLock.LockFactory> lockFactory() {
         return Optional.empty();
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
index 499a7ca6a..a9f27e70a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
@@ -43,10 +43,13 @@ public interface Lock extends AutoCloseable {
         Lock create();
     }
 
-    static Factory factory(@Nullable CatalogLock.Factory lockFactory, 
Identifier tablePath) {
+    static Factory factory(
+            @Nullable CatalogLock.LockFactory lockFactory,
+            @Nullable CatalogLock.LockContext lockContext,
+            Identifier tablePath) {
         return lockFactory == null
                 ? new EmptyFactory()
-                : new CatalogLockFactory(lockFactory, tablePath);
+                : new CatalogLockFactory(lockFactory, lockContext, tablePath);
     }
 
     static Factory emptyFactory() {
@@ -58,17 +61,22 @@ public interface Lock extends AutoCloseable {
 
         private static final long serialVersionUID = 1L;
 
-        private final CatalogLock.Factory lockFactory;
+        private final CatalogLock.LockFactory lockFactory;
+        private final CatalogLock.LockContext lockContext;
         private final Identifier tablePath;
 
-        public CatalogLockFactory(CatalogLock.Factory lockFactory, Identifier 
tablePath) {
+        public CatalogLockFactory(
+                CatalogLock.LockFactory lockFactory,
+                CatalogLock.LockContext lockContext,
+                Identifier tablePath) {
             this.lockFactory = lockFactory;
+            this.lockContext = lockContext;
             this.tablePath = tablePath;
         }
 
         @Override
         public Lock create() {
-            return fromCatalog(lockFactory.create(), tablePath);
+            return fromCatalog(lockFactory.create(lockContext), tablePath);
         }
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 7d50b7e8b..07463302c 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -148,10 +148,15 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     @Override
-    public Optional<CatalogLock.Factory> lockFactory() {
-        return lockEnabled()
-                ? Optional.of(HiveCatalogLock.createFactory(hiveConf, 
clientClassName))
-                : Optional.empty();
+    public Optional<CatalogLock.LockFactory> lockFactory() {
+        return lockEnabled() ? Optional.of(HiveCatalogLock.createFactory()) : 
Optional.empty();
+    }
+
+    @Override
+    public Optional<CatalogLock.LockContext> lockContext() {
+        return Optional.of(
+                new HiveCatalogLock.HiveLockContext(
+                        new SerializableHiveConf(hiveConf), clientClassName));
     }
 
     private boolean lockEnabled() {
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
index 0f6cd4837..1635d8096 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Callable;
 
 import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
 import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Hive {@link CatalogLock}. */
 public class HiveCatalogLock implements CatalogLock {
@@ -112,30 +113,31 @@ public class HiveCatalogLock implements CatalogLock {
     }
 
     /** Create a hive lock factory. */
-    public static CatalogLock.Factory createFactory(HiveConf hiveConf, String 
clientClassName) {
-        return new HiveCatalogLockFactory(hiveConf, clientClassName);
+    public static LockFactory createFactory() {
+        return new HiveCatalogLockFactory();
     }
 
-    private static class HiveCatalogLockFactory implements CatalogLock.Factory 
{
+    private static class HiveCatalogLockFactory implements LockFactory {
 
         private static final long serialVersionUID = 1L;
 
-        private final SerializableHiveConf hiveConf;
-        private final String clientClassName;
-
-        public HiveCatalogLockFactory(HiveConf hiveConf, String 
clientClassName) {
-            this.hiveConf = new SerializableHiveConf(hiveConf);
-            this.clientClassName = clientClassName;
-        }
+        private static final String IDENTIFIER = "hive";
 
         @Override
-        public CatalogLock create() {
-            HiveConf conf = hiveConf.conf();
+        public CatalogLock create(LockContext context) {
+            checkArgument(context instanceof HiveLockContext);
+            HiveLockContext hiveLockContext = (HiveLockContext) context;
+            HiveConf conf = hiveLockContext.hiveConf.conf();
             return new HiveCatalogLock(
-                    HiveCatalog.createClient(conf, clientClassName),
+                    HiveCatalog.createClient(conf, 
hiveLockContext.clientClassName),
                     checkMaxSleep(conf),
                     acquireTimeout(conf));
         }
+
+        @Override
+        public String identifier() {
+            return IDENTIFIER;
+        }
     }
 
     public static long checkMaxSleep(HiveConf conf) {
@@ -153,4 +155,14 @@ public class HiveCatalogLock implements CatalogLock {
                                 
TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
                 .toMillis();
     }
+
+    static class HiveLockContext implements LockContext {
+        private final SerializableHiveConf hiveConf;
+        private final String clientClassName;
+
+        public HiveLockContext(SerializableHiveConf hiveConf, String 
clientClassName) {
+            this.hiveConf = hiveConf;
+            this.clientClassName = clientClassName;
+        }
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 051a23979..939435560 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -689,11 +689,9 @@ public abstract class HiveCatalogITCaseBase {
     @Test
     public void testHiveLock() throws InterruptedException {
         tEnv.executeSql("CREATE TABLE t (a INT)");
-        CatalogLock.Factory lockFactory =
-                ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get())
-                        .catalog()
-                        .lockFactory()
-                        .get();
+        Catalog catalog =
+                ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+        CatalogLock.LockFactory lockFactory = catalog.lockFactory().get();
 
         AtomicInteger count = new AtomicInteger(0);
         List<Thread> threads = new ArrayList<>();
@@ -708,7 +706,7 @@ public abstract class HiveCatalogITCaseBase {
             Thread thread =
                     new Thread(
                             () -> {
-                                CatalogLock lock = lockFactory.create();
+                                CatalogLock lock = 
lockFactory.create(catalog.lockContext().get());
                                 for (int j = 0; j < 10; j++) {
                                     try {
                                         lock.runWithLock("test_db", "t", 
unsafeIncrement);

Reply via email to