This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 19b283e1bb [flink] Remove useless classloader in FlinkCatalog
19b283e1bb is described below

commit 19b283e1bb75277370f970c5277bb9d83fe930bb
Author: JingsongLi <[email protected]>
AuthorDate: Mon Dec 29 22:49:52 2025 +0800

    [flink] Remove useless classloader in FlinkCatalog
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 29 ++++++++--------------
 .../apache/paimon/flink/FlinkCatalogFactory.java   |  8 +-----
 .../paimon/flink/FlinkGenericCatalogFactory.java   |  1 -
 3 files changed, 12 insertions(+), 26 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 7a777d557a..8bba96cf78 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -93,6 +93,7 @@ import 
org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
 import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
 import org.apache.flink.table.catalog.TableChange.ResetOption;
 import org.apache.flink.table.catalog.TableChange.SetOption;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -179,23 +180,16 @@ public class FlinkCatalog extends AbstractCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalog.class);
 
-    private final ClassLoader classLoader;
     private final Catalog catalog;
     private final String name;
 
     private final boolean disableCreateTableInDefaultDatabase;
 
-    public FlinkCatalog(
-            Catalog catalog,
-            String name,
-            String defaultDatabase,
-            ClassLoader classLoader,
-            Options options) {
+    public FlinkCatalog(Catalog catalog, String name, String defaultDatabase, 
Options options) {
         super(name, defaultDatabase);
         LOG.info("Creating Flink catalog: metastore={}", 
options.get(CatalogOptions.METASTORE));
         this.catalog = catalog;
         this.name = name;
-        this.classLoader = classLoader;
         this.disableCreateTableInDefaultDatabase = 
options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
         if (!disableCreateTableInDefaultDatabase) {
             try {
@@ -248,9 +242,9 @@ public class FlinkCatalog extends AbstractCatalog {
         Map<String, String> properties;
         if (database != null) {
             properties = new HashMap<>(database.getProperties());
-            if (database.getDescription().isPresent()
-                    && !database.getDescription().get().equals("")) {
-                properties.put(COMMENT_PROP, database.getDescription().get());
+            Optional<String> description = database.getDescription();
+            if (description.isPresent() && !description.get().isEmpty()) {
+                properties.put(COMMENT_PROP, description.get());
             }
         } else {
             properties = Collections.emptyMap();
@@ -941,7 +935,7 @@ public class FlinkCatalog extends AbstractCatalog {
         }
 
         // add primary keys
-        if (table.primaryKeys().size() > 0) {
+        if (!table.primaryKeys().isEmpty()) {
             builder.primaryKey(table.primaryKeys());
         }
 
@@ -1012,6 +1006,7 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
+        @SuppressWarnings("unchecked")
         ResolvedSchema schema =
                 ((ResolvedCatalogBaseTable<CatalogBaseTable>) 
catalogTable).getResolvedSchema();
         RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
@@ -1037,7 +1032,7 @@ public class FlinkCatalog extends AbstractCatalog {
                         .options(options)
                         .primaryKey(
                                 schema.getPrimaryKey()
-                                        .map(pk -> pk.getColumns())
+                                        .map(UniqueConstraint::getColumns)
                                         .orElse(Collections.emptyList()))
                         .partitionKeys(getPartitionKeys(catalogTable));
         Map<String, String> columnComments = getColumnComments(catalogTable);
@@ -1185,7 +1180,7 @@ public class FlinkCatalog extends AbstractCatalog {
     private List<PartitionEntry> getPartitionEntries(
             Table table, ObjectPath tablePath, @Nullable CatalogPartitionSpec 
partitionSpec)
             throws TableNotPartitionedException {
-        if (table.partitionKeys() == null || table.partitionKeys().size() == 
0) {
+        if (table.partitionKeys() == null || table.partitionKeys().isEmpty()) {
             throw new TableNotPartitionedException(getName(), tablePath);
         }
 
@@ -1267,7 +1262,7 @@ public class FlinkCatalog extends AbstractCatalog {
             throws CatalogException {
         try {
             List<CatalogPartitionSpec> partitionSpecs = 
getPartitionSpecs(tablePath, partitionSpec);
-            return partitionSpecs.size() > 0;
+            return !partitionSpecs.isEmpty();
         } catch (TableNotPartitionedException | TableNotExistException e) {
             throw new CatalogException(e);
         }
@@ -1444,9 +1439,7 @@ public class FlinkCatalog extends AbstractCatalog {
             }
         } catch (Catalog.FunctionNotExistException e) {
             throw new FunctionNotExistException(getName(), functionPath);
-        } catch (Catalog.DefinitionAlreadyExistException e) {
-            throw new RuntimeException(e);
-        } catch (Catalog.DefinitionNotExistException e) {
+        } catch (Catalog.DefinitionAlreadyExistException | 
Catalog.DefinitionNotExistException e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
index fe4f55cbe9..2933207213 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
@@ -63,17 +63,11 @@ public class FlinkCatalogFactory implements 
org.apache.flink.table.factories.Cat
                 CatalogFactory.createCatalog(context, classLoader),
                 catalogName,
                 context.options().get(DEFAULT_DATABASE),
-                classLoader,
                 context.options());
     }
 
     public static FlinkCatalog createCatalog(String catalogName, Catalog 
catalog, Options options) {
-        return new FlinkCatalog(
-                catalog,
-                catalogName,
-                Catalog.DEFAULT_DATABASE,
-                FlinkCatalogFactory.class.getClassLoader(),
-                options);
+        return new FlinkCatalog(catalog, catalogName, 
Catalog.DEFAULT_DATABASE, options);
     }
 
     public static Catalog createPaimonCatalog(Options catalogOptions) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
index 7c3a13c6f3..6ec0d8a8df 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
@@ -96,7 +96,6 @@ public class FlinkGenericCatalogFactory implements 
CatalogFactory {
                                 CatalogContext.create(options, new 
FlinkFileIOLoader()), cl),
                         name,
                         options.get(DEFAULT_DATABASE),
-                        cl,
                         options);
 
         return new FlinkGenericCatalog(paimon, flinkCatalog);

Reply via email to