This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9723618eb [core] Refactor catalog lock factory for paimon Factory
(#2872)
9723618eb is described below
commit 9723618eb756a369a1c23ba452ca1858e1222be5
Author: Fang Yong <[email protected]>
AuthorDate: Wed Feb 21 15:16:06 2024 +0800
[core] Refactor catalog lock factory for paimon Factory (#2872)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 3 +-
.../java/org/apache/paimon/catalog/Catalog.java | 7 +++-
.../org/apache/paimon/catalog/CatalogLock.java | 8 +++--
.../apache/paimon/catalog/FileSystemCatalog.java | 2 +-
.../java/org/apache/paimon/operation/Lock.java | 18 +++++++---
.../java/org/apache/paimon/hive/HiveCatalog.java | 13 +++++---
.../org/apache/paimon/hive/HiveCatalogLock.java | 38 ++++++++++++++--------
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 10 +++---
8 files changed, 66 insertions(+), 33 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 89fd52f6c..3c0b0063b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -304,7 +304,8 @@ public abstract class AbstractCatalog implements Catalog {
getDataTableLocation(identifier),
tableSchema,
new CatalogEnvironment(
- Lock.factory(lockFactory().orElse(null), identifier),
+ Lock.factory(
+ lockFactory().orElse(null),
lockContext().orElse(null), identifier),
metastoreClientFactory(identifier).orElse(null),
lineageMetaFactory));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 005f5b16a..0168891b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -49,7 +49,12 @@ public interface Catalog extends AutoCloseable {
* Get lock factory from catalog. Lock is used to support multiple
concurrent writes on the
* object store.
*/
- Optional<CatalogLock.Factory> lockFactory();
+ Optional<CatalogLock.LockFactory> lockFactory();
+
+ /** Get lock context for lock factory to create a lock. */
+ default Optional<CatalogLock.LockContext> lockContext() {
+ return Optional.empty();
+ }
/** Get metastore client factory for the table specified by {@code
identifier}. */
default Optional<MetastoreClient.Factory>
metastoreClientFactory(Identifier identifier) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
index 278b3ad63..0e547037e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLock.java
@@ -19,6 +19,7 @@
package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.factories.Factory;
import java.io.Closeable;
import java.io.Serializable;
@@ -36,7 +37,10 @@ public interface CatalogLock extends Closeable {
<T> T runWithLock(String database, String table, Callable<T> callable)
throws Exception;
/** Factory to create {@link CatalogLock}. */
- interface Factory extends Serializable {
- CatalogLock create();
+ interface LockFactory extends Factory, Serializable {
+ CatalogLock create(LockContext context);
}
+
+ /** Context for lock factory to create lock. */
+ interface LockContext extends Serializable {}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index e458dad7c..0c01e9cb7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -57,7 +57,7 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public Optional<CatalogLock.Factory> lockFactory() {
+ public Optional<CatalogLock.LockFactory> lockFactory() {
return Optional.empty();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
index 499a7ca6a..a9f27e70a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
@@ -43,10 +43,13 @@ public interface Lock extends AutoCloseable {
Lock create();
}
- static Factory factory(@Nullable CatalogLock.Factory lockFactory,
Identifier tablePath) {
+ static Factory factory(
+ @Nullable CatalogLock.LockFactory lockFactory,
+ @Nullable CatalogLock.LockContext lockContext,
+ Identifier tablePath) {
return lockFactory == null
? new EmptyFactory()
- : new CatalogLockFactory(lockFactory, tablePath);
+ : new CatalogLockFactory(lockFactory, lockContext, tablePath);
}
static Factory emptyFactory() {
@@ -58,17 +61,22 @@ public interface Lock extends AutoCloseable {
private static final long serialVersionUID = 1L;
- private final CatalogLock.Factory lockFactory;
+ private final CatalogLock.LockFactory lockFactory;
+ private final CatalogLock.LockContext lockContext;
private final Identifier tablePath;
- public CatalogLockFactory(CatalogLock.Factory lockFactory, Identifier
tablePath) {
+ public CatalogLockFactory(
+ CatalogLock.LockFactory lockFactory,
+ CatalogLock.LockContext lockContext,
+ Identifier tablePath) {
this.lockFactory = lockFactory;
+ this.lockContext = lockContext;
this.tablePath = tablePath;
}
@Override
public Lock create() {
- return fromCatalog(lockFactory.create(), tablePath);
+ return fromCatalog(lockFactory.create(lockContext), tablePath);
}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 7d50b7e8b..07463302c 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -148,10 +148,15 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public Optional<CatalogLock.Factory> lockFactory() {
- return lockEnabled()
- ? Optional.of(HiveCatalogLock.createFactory(hiveConf,
clientClassName))
- : Optional.empty();
+ public Optional<CatalogLock.LockFactory> lockFactory() {
+ return lockEnabled() ? Optional.of(HiveCatalogLock.createFactory()) :
Optional.empty();
+ }
+
+ @Override
+ public Optional<CatalogLock.LockContext> lockContext() {
+ return Optional.of(
+ new HiveCatalogLock.HiveLockContext(
+ new SerializableHiveConf(hiveConf), clientClassName));
}
private boolean lockEnabled() {
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
index 0f6cd4837..1635d8096 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLock.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Callable;
import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Hive {@link CatalogLock}. */
public class HiveCatalogLock implements CatalogLock {
@@ -112,30 +113,31 @@ public class HiveCatalogLock implements CatalogLock {
}
/** Create a hive lock factory. */
- public static CatalogLock.Factory createFactory(HiveConf hiveConf, String
clientClassName) {
- return new HiveCatalogLockFactory(hiveConf, clientClassName);
+ public static LockFactory createFactory() {
+ return new HiveCatalogLockFactory();
}
- private static class HiveCatalogLockFactory implements CatalogLock.Factory
{
+ private static class HiveCatalogLockFactory implements LockFactory {
private static final long serialVersionUID = 1L;
- private final SerializableHiveConf hiveConf;
- private final String clientClassName;
-
- public HiveCatalogLockFactory(HiveConf hiveConf, String
clientClassName) {
- this.hiveConf = new SerializableHiveConf(hiveConf);
- this.clientClassName = clientClassName;
- }
+ private static final String IDENTIFIER = "hive";
@Override
- public CatalogLock create() {
- HiveConf conf = hiveConf.conf();
+ public CatalogLock create(LockContext context) {
+ checkArgument(context instanceof HiveLockContext);
+ HiveLockContext hiveLockContext = (HiveLockContext) context;
+ HiveConf conf = hiveLockContext.hiveConf.conf();
return new HiveCatalogLock(
- HiveCatalog.createClient(conf, clientClassName),
+ HiveCatalog.createClient(conf,
hiveLockContext.clientClassName),
checkMaxSleep(conf),
acquireTimeout(conf));
}
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
}
public static long checkMaxSleep(HiveConf conf) {
@@ -153,4 +155,14 @@ public class HiveCatalogLock implements CatalogLock {
TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
.toMillis();
}
+
+ static class HiveLockContext implements LockContext {
+ private final SerializableHiveConf hiveConf;
+ private final String clientClassName;
+
+ public HiveLockContext(SerializableHiveConf hiveConf, String
clientClassName) {
+ this.hiveConf = hiveConf;
+ this.clientClassName = clientClassName;
+ }
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 051a23979..939435560 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -689,11 +689,9 @@ public abstract class HiveCatalogITCaseBase {
@Test
public void testHiveLock() throws InterruptedException {
tEnv.executeSql("CREATE TABLE t (a INT)");
- CatalogLock.Factory lockFactory =
- ((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get())
- .catalog()
- .lockFactory()
- .get();
+ Catalog catalog =
+ ((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+ CatalogLock.LockFactory lockFactory = catalog.lockFactory().get();
AtomicInteger count = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
@@ -708,7 +706,7 @@ public abstract class HiveCatalogITCaseBase {
Thread thread =
new Thread(
() -> {
- CatalogLock lock = lockFactory.create();
+ CatalogLock lock =
lockFactory.create(catalog.lockContext().get());
for (int j = 0; j < 10; j++) {
try {
lock.runWithLock("test_db", "t",
unsafeIncrement);