This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 19b283e1bb [flink] Remove useless classloader in FlinkCatalog
19b283e1bb is described below
commit 19b283e1bb75277370f970c5277bb9d83fe930bb
Author: JingsongLi <[email protected]>
AuthorDate: Mon Dec 29 22:49:52 2025 +0800
[flink] Remove useless classloader in FlinkCatalog
---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 29 ++++++++--------------
.../apache/paimon/flink/FlinkCatalogFactory.java | 8 +-----
.../paimon/flink/FlinkGenericCatalogFactory.java | 1 -
3 files changed, 12 insertions(+), 26 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 7a777d557a..8bba96cf78 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -93,6 +93,7 @@ import
org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
import org.apache.flink.table.catalog.TableChange.ResetOption;
import org.apache.flink.table.catalog.TableChange.SetOption;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -179,23 +180,16 @@ public class FlinkCatalog extends AbstractCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkCatalog.class);
- private final ClassLoader classLoader;
private final Catalog catalog;
private final String name;
private final boolean disableCreateTableInDefaultDatabase;
- public FlinkCatalog(
- Catalog catalog,
- String name,
- String defaultDatabase,
- ClassLoader classLoader,
- Options options) {
+ public FlinkCatalog(Catalog catalog, String name, String defaultDatabase,
Options options) {
super(name, defaultDatabase);
LOG.info("Creating Flink catalog: metastore={}",
options.get(CatalogOptions.METASTORE));
this.catalog = catalog;
this.name = name;
- this.classLoader = classLoader;
this.disableCreateTableInDefaultDatabase =
options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
if (!disableCreateTableInDefaultDatabase) {
try {
@@ -248,9 +242,9 @@ public class FlinkCatalog extends AbstractCatalog {
Map<String, String> properties;
if (database != null) {
properties = new HashMap<>(database.getProperties());
- if (database.getDescription().isPresent()
- && !database.getDescription().get().equals("")) {
- properties.put(COMMENT_PROP, database.getDescription().get());
+ Optional<String> description = database.getDescription();
+ if (description.isPresent() && !description.get().isEmpty()) {
+ properties.put(COMMENT_PROP, description.get());
}
} else {
properties = Collections.emptyMap();
@@ -941,7 +935,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
// add primary keys
- if (table.primaryKeys().size() > 0) {
+ if (!table.primaryKeys().isEmpty()) {
builder.primaryKey(table.primaryKeys());
}
@@ -1012,6 +1006,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
public static Schema fromCatalogTable(CatalogBaseTable catalogTable) {
+ @SuppressWarnings("unchecked")
ResolvedSchema schema =
((ResolvedCatalogBaseTable<CatalogBaseTable>)
catalogTable).getResolvedSchema();
RowType rowType = (RowType)
schema.toPhysicalRowDataType().getLogicalType();
@@ -1037,7 +1032,7 @@ public class FlinkCatalog extends AbstractCatalog {
.options(options)
.primaryKey(
schema.getPrimaryKey()
- .map(pk -> pk.getColumns())
+ .map(UniqueConstraint::getColumns)
.orElse(Collections.emptyList()))
.partitionKeys(getPartitionKeys(catalogTable));
Map<String, String> columnComments = getColumnComments(catalogTable);
@@ -1185,7 +1180,7 @@ public class FlinkCatalog extends AbstractCatalog {
private List<PartitionEntry> getPartitionEntries(
Table table, ObjectPath tablePath, @Nullable CatalogPartitionSpec
partitionSpec)
throws TableNotPartitionedException {
- if (table.partitionKeys() == null || table.partitionKeys().size() ==
0) {
+ if (table.partitionKeys() == null || table.partitionKeys().isEmpty()) {
throw new TableNotPartitionedException(getName(), tablePath);
}
@@ -1267,7 +1262,7 @@ public class FlinkCatalog extends AbstractCatalog {
throws CatalogException {
try {
List<CatalogPartitionSpec> partitionSpecs =
getPartitionSpecs(tablePath, partitionSpec);
- return partitionSpecs.size() > 0;
+ return !partitionSpecs.isEmpty();
} catch (TableNotPartitionedException | TableNotExistException e) {
throw new CatalogException(e);
}
@@ -1444,9 +1439,7 @@ public class FlinkCatalog extends AbstractCatalog {
}
} catch (Catalog.FunctionNotExistException e) {
throw new FunctionNotExistException(getName(), functionPath);
- } catch (Catalog.DefinitionAlreadyExistException e) {
- throw new RuntimeException(e);
- } catch (Catalog.DefinitionNotExistException e) {
+ } catch (Catalog.DefinitionAlreadyExistException |
Catalog.DefinitionNotExistException e) {
throw new RuntimeException(e);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
index fe4f55cbe9..2933207213 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalogFactory.java
@@ -63,17 +63,11 @@ public class FlinkCatalogFactory implements
org.apache.flink.table.factories.Cat
CatalogFactory.createCatalog(context, classLoader),
catalogName,
context.options().get(DEFAULT_DATABASE),
- classLoader,
context.options());
}
public static FlinkCatalog createCatalog(String catalogName, Catalog
catalog, Options options) {
- return new FlinkCatalog(
- catalog,
- catalogName,
- Catalog.DEFAULT_DATABASE,
- FlinkCatalogFactory.class.getClassLoader(),
- options);
+ return new FlinkCatalog(catalog, catalogName,
Catalog.DEFAULT_DATABASE, options);
}
public static Catalog createPaimonCatalog(Options catalogOptions) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
index 7c3a13c6f3..6ec0d8a8df 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
@@ -96,7 +96,6 @@ public class FlinkGenericCatalogFactory implements
CatalogFactory {
CatalogContext.create(options, new
FlinkFileIOLoader()), cl),
name,
options.get(DEFAULT_DATABASE),
- cl,
options);
return new FlinkGenericCatalog(paimon, flinkCatalog);