This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new aec141dc8 [flink] Optimize the getLakeTableFactory in LakeTableFactory 
(#1816)
aec141dc8 is described below

commit aec141dc8414217c833758956a6488ade1dad160
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Oct 16 13:37:33 2025 +0800

    [flink] Optimize the getLakeTableFactory in LakeTableFactory (#1816)
---
 .../apache/fluss/flink/catalog/FlinkCatalog.java   | 10 +--
 .../fluss/flink/catalog/FlinkTableFactory.java     | 10 +--
 .../{LakeCatalog.java => LakeFlinkCatalog.java}    | 80 ++++++++++++----------
 .../apache/fluss/flink/lake/LakeTableFactory.java  | 62 ++++++-----------
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java |  9 +--
 5 files changed, 75 insertions(+), 96 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 3ecb01541..7e10c4a5c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -23,7 +23,7 @@ import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.InvalidTableException;
-import org.apache.fluss.flink.lake.LakeCatalog;
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.procedure.ProcedureManager;
 import org.apache.fluss.flink.utils.CatalogExceptionUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
@@ -114,7 +114,7 @@ public class FlinkCatalog extends AbstractCatalog {
     protected final String defaultDatabase;
     protected final String bootstrapServers;
     protected final Map<String, String> securityConfigs;
-    private final LakeCatalog lakeCatalog;
+    private final LakeFlinkCatalog lakeFlinkCatalog;
     protected Connection connection;
     protected Admin admin;
 
@@ -130,12 +130,12 @@ public class FlinkCatalog extends AbstractCatalog {
         this.bootstrapServers = bootstrapServers;
         this.classLoader = classLoader;
         this.securityConfigs = securityConfigs;
-        this.lakeCatalog = new LakeCatalog(catalogName, classLoader);
+        this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader);
     }
 
     @Override
     public Optional<Factory> getFactory() {
-        return Optional.of(new FlinkTableFactory(lakeCatalog));
+        return Optional.of(new FlinkTableFactory(lakeFlinkCatalog));
     }
 
     @Override
@@ -340,7 +340,7 @@ public class FlinkCatalog extends AbstractCatalog {
             // Need to reconstruct: table_name + $snapshots
             tableName = String.join("", tableComponents);
         }
-        return lakeCatalog
+        return lakeFlinkCatalog
                 .getLakeCatalog(properties)
                 .getTable(new ObjectPath(databaseName, tableName));
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 0b0a59981..57ead2c9e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -20,7 +20,7 @@ package org.apache.fluss.flink.catalog;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.FlinkConnectorOptions;
-import org.apache.fluss.flink.lake.LakeCatalog;
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.lake.LakeTableFactory;
 import org.apache.fluss.flink.sink.FlinkTableSink;
 import org.apache.fluss.flink.source.FlinkTableSource;
@@ -69,11 +69,11 @@ import static 
org.apache.fluss.flink.utils.FlinkConversions.toFlinkOption;
 /** Factory to create table source and table sink for Fluss. */
 public class FlinkTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
 
-    private final LakeCatalog lakeCatalog;
+    private final LakeFlinkCatalog lakeFlinkCatalog;
     private volatile LakeTableFactory lakeTableFactory;
 
-    public FlinkTableFactory(LakeCatalog lakeCatalog) {
-        this.lakeCatalog = lakeCatalog;
+    public FlinkTableFactory(LakeFlinkCatalog lakeFlinkCatalog) {
+        this.lakeFlinkCatalog = lakeFlinkCatalog;
     }
 
     @Override
@@ -257,7 +257,7 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
         if (lakeTableFactory == null) {
             synchronized (this) {
                 if (lakeTableFactory == null) {
-                    lakeTableFactory = new LakeTableFactory(lakeCatalog);
+                    lakeTableFactory = new LakeTableFactory(lakeFlinkCatalog);
                 }
             }
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
similarity index 68%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
index e1f8096e6..3eb0db5d9 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
@@ -21,7 +21,6 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.utils.DataLakeUtils;
 import org.apache.fluss.metadata.DataLakeFormat;
-import org.apache.fluss.utils.MapUtils;
 
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
@@ -34,59 +33,64 @@ import java.util.Map;
 
 import static org.apache.fluss.metadata.DataLakeFormat.ICEBERG;
 import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
-/** A lake catalog to delegate the operations on lake table. */
-public class LakeCatalog {
-    private static final Map<DataLakeFormat, Catalog> LAKE_CATALOG_CACHE =
-            MapUtils.newConcurrentHashMap();
+/** A lake flink catalog to delegate the operations on lake table. */
+public class LakeFlinkCatalog {
 
     private final String catalogName;
     private final ClassLoader classLoader;
 
-    public LakeCatalog(String catalogName, ClassLoader classLoader) {
+    private volatile Catalog catalog;
+    private volatile DataLakeFormat lakeFormat;
+
+    public LakeFlinkCatalog(String catalogName, ClassLoader classLoader) {
         this.catalogName = catalogName;
         this.classLoader = classLoader;
     }
 
     public Catalog getLakeCatalog(Configuration tableOptions) {
-        DataLakeFormat lakeFormat = 
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
-        if (lakeFormat == null) {
-            throw new IllegalArgumentException(
-                    "DataLake format is not specified in table options. "
-                            + "Please ensure '"
-                            + ConfigOptions.TABLE_DATALAKE_FORMAT.key()
-                            + "' is set.");
-        }
-        return LAKE_CATALOG_CACHE.computeIfAbsent(
-                lakeFormat,
-                (dataLakeFormat) -> {
-                    if (dataLakeFormat == PAIMON) {
-                        return PaimonCatalogFactory.create(catalogName, 
tableOptions, classLoader);
-                    } else if (dataLakeFormat == ICEBERG) {
-                        return IcebergCatalogFactory.create(catalogName, 
tableOptions);
+        // TODO: Currently, a Fluss cluster only supports a single DataLake 
storage.
+        // However, in the
+        //  future, it may support multiple DataLakes. The following code 
assumes
+        // that a single
+        //  lakeCatalog is shared across multiple tables, which will no longer 
be
+        // valid in such
+        //  cases and should be updated accordingly.
+        if (catalog == null) {
+            synchronized (this) {
+                if (catalog == null) {
+                    DataLakeFormat lakeFormat =
+                            
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
+                    if (lakeFormat == null) {
+                        throw new IllegalArgumentException(
+                                "DataLake format is not specified in table 
options. "
+                                        + "Please ensure '"
+                                        + 
ConfigOptions.TABLE_DATALAKE_FORMAT.key()
+                                        + "' is set.");
+                    }
+                    if (lakeFormat == PAIMON) {
+                        catalog =
+                                PaimonCatalogFactory.create(catalogName, 
tableOptions, classLoader);
+                        this.lakeFormat = PAIMON;
+                    } else if (lakeFormat == ICEBERG) {
+                        catalog = IcebergCatalogFactory.create(catalogName, 
tableOptions);
+                        this.lakeFormat = ICEBERG;
                     } else {
                         throw new UnsupportedOperationException(
-                                "Unsupported datalake format: " + 
dataLakeFormat);
+                                "Unsupported data lake format: " + lakeFormat);
                     }
-                });
+                }
+            }
+        }
+        return catalog;
     }
 
-    public Catalog getLakeCatalog(Configuration tableOptions, DataLakeFormat 
lakeFormat) {
-        if (lakeFormat == null) {
-            throw new IllegalArgumentException("DataLake format cannot be 
null");
-        }
-        return LAKE_CATALOG_CACHE.computeIfAbsent(
+    public DataLakeFormat getLakeFormat() {
+        checkNotNull(
                 lakeFormat,
-                (dataLakeFormat) -> {
-                    if (dataLakeFormat == PAIMON) {
-                        return PaimonCatalogFactory.create(catalogName, 
tableOptions, classLoader);
-                    } else if (dataLakeFormat == ICEBERG) {
-                        return IcebergCatalogFactory.create(catalogName, 
tableOptions);
-                    } else {
-                        throw new UnsupportedOperationException(
-                                "Unsupported datalake format: " + 
dataLakeFormat);
-                    }
-                });
+                "DataLake format is null, must call getLakeCatalog first to 
initialize lake format.");
+        return lakeFormat;
     }
 
     /**
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
index 93120e9e2..3f0ff88c6 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
@@ -17,20 +17,20 @@
 
 package org.apache.fluss.flink.lake;
 
+import org.apache.fluss.config.Configuration;
+
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 
-import java.util.Map;
-
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
-    private final LakeCatalog lakeCatalog;
+    private final LakeFlinkCatalog lakeFlinkCatalog;
 
-    public LakeTableFactory(LakeCatalog lakeCatalog) {
-        this.lakeCatalog = lakeCatalog;
+    public LakeTableFactory(LakeFlinkCatalog lakeFlinkCatalog) {
+        this.lakeFlinkCatalog = lakeFlinkCatalog;
     }
 
     public DynamicTableSource createDynamicTableSource(
@@ -42,21 +42,6 @@ public class LakeTableFactory {
                         originIdentifier.getDatabaseName(),
                         tableName);
 
-        // Determine the lake format from the table options
-        Map<String, String> tableOptions = 
context.getCatalogTable().getOptions();
-
-        // If not present, fallback to 'fluss.table.datalake.format' (set by 
Fluss)
-        String connector = tableOptions.get("connector");
-        if (connector == null) {
-            connector = tableOptions.get("fluss.table.datalake.format");
-        }
-
-        if (connector == null) {
-            // For Paimon system tables (like table_name$options), the table 
options are empty
-            // Default to Paimon for backward compatibility
-            connector = "paimon";
-        }
-
         // For Iceberg and Paimon, pass the table name as-is to their factory.
         // Metadata tables will be handled internally by their respective 
factories.
         DynamicTableFactory.Context newContext =
@@ -69,21 +54,21 @@ public class LakeTableFactory {
                         context.isTemporary());
 
         // Get the appropriate factory based on connector type
-        DynamicTableSourceFactory factory = getLakeTableFactory(connector, 
tableOptions);
+        DynamicTableSourceFactory factory = getLakeTableFactory();
         return factory.createDynamicTableSource(newContext);
     }
 
-    private DynamicTableSourceFactory getLakeTableFactory(
-            String connector, Map<String, String> tableOptions) {
-        if ("paimon".equalsIgnoreCase(connector)) {
-            return getPaimonFactory();
-        } else if ("iceberg".equalsIgnoreCase(connector)) {
-            return getIcebergFactory(tableOptions);
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unsupported lake connector: "
-                            + connector
-                            + ". Only 'paimon' and 'iceberg' are supported.");
+    private DynamicTableSourceFactory getLakeTableFactory() {
+        switch (lakeFlinkCatalog.getLakeFormat()) {
+            case PAIMON:
+                return getPaimonFactory();
+            case ICEBERG:
+                return getIcebergFactory();
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported lake connector: "
+                                + lakeFlinkCatalog.getLakeFormat()
+                                + ". Only 'paimon' and 'iceberg' are 
supported.");
         }
     }
 
@@ -91,22 +76,19 @@ public class LakeTableFactory {
         return new org.apache.paimon.flink.FlinkTableFactory();
     }
 
-    private DynamicTableSourceFactory getIcebergFactory(Map<String, String> 
tableOptions) {
+    private DynamicTableSourceFactory getIcebergFactory() {
         try {
-            // Get the Iceberg FlinkCatalog instance from LakeCatalog
-            org.apache.fluss.config.Configuration flussConfig =
-                    
org.apache.fluss.config.Configuration.fromMap(tableOptions);
-
             // Get catalog with explicit ICEBERG format
             org.apache.flink.table.catalog.Catalog catalog =
-                    lakeCatalog.getLakeCatalog(
-                            flussConfig, 
org.apache.fluss.metadata.DataLakeFormat.ICEBERG);
+                    lakeFlinkCatalog.getLakeCatalog(
+                            // we can pass empty configuration to get catalog
+                            // since the catalog should already be initialized
+                            new Configuration());
 
             // Create FlinkDynamicTableFactory with the catalog
             Class<?> icebergFactoryClass =
                     
Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory");
             Class<?> flinkCatalogClass = 
Class.forName("org.apache.iceberg.flink.FlinkCatalog");
-
             return (DynamicTableSourceFactory)
                     icebergFactoryClass
                             .getDeclaredConstructor(flinkCatalogClass)
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 0ff38190c..0518290a5 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -278,7 +278,7 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
 
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
-    void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws 
Exception {
+    void testReadIcebergLakeTable(boolean isPartitioned) throws Exception {
         // first of all, start tiering
         JobClient jobClient = buildTieringJob(execEnv);
 
@@ -308,13 +308,6 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase
         int expectedUserRowCount = isPartitioned ? 2 * 
waitUntilPartitions(t1).size() : 2;
         assertThat(icebergRows).hasSize(expectedUserRowCount);
 
-        // verify rows have expected number of columns
-        int userColumnCount = 
lakeTableResult.getResolvedSchema().getColumnCount();
-        Row firstRow = icebergRows.get(0);
-        assertThat(firstRow.getArity())
-                .as("Iceberg row should have at least user columns")
-                .isGreaterThanOrEqualTo(userColumnCount);
-
         // Test 2: Read Iceberg system table (snapshots) using $lake$snapshots 
suffix
         TableResult snapshotsResult =
                 batchTEnv.executeSql(String.format("select * from 
%s$lake$snapshots", tableName));

Reply via email to