This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 73c86a9bf1 [hotfix] Minor refactor SparkCatalog (#6188)
73c86a9bf1 is described below
commit 73c86a9bf1b5cfcb9318b615f56324cdf5a3e3ce
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Sep 3 16:59:46 2025 +0800
[hotfix] Minor refactor SparkCatalog (#6188)
---
.../java/org/apache/paimon/catalog/Catalog.java | 4 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 223 +++++++++++----------
.../apache/paimon/spark/SparkGenericCatalog.java | 6 +
3 files changed, 123 insertions(+), 110 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 3109d7bc2d..702dc155a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -613,7 +613,7 @@ public interface Catalog extends AutoCloseable {
* will throw an {@link UnsupportedOperationException}, affect the
following methods:
*
* <ul>
- * <li>{@link #commitSnapshot(Identifier, Snapshot, List)}.
+ * <li>{@link #commitSnapshot(Identifier, String, Snapshot, List)}.
* <li>{@link #loadSnapshot(Identifier)}.
* <li>{@link #rollbackTo(Identifier, Instant)}.
* <li>{@link #createBranch(Identifier, String, String)}.
@@ -787,6 +787,8 @@ public interface Catalog extends AutoCloseable {
void alterPartitions(Identifier identifier, List<PartitionStatistics>
partitions)
throws TableNotExistException;
+ // ======================= Function methods ===============================
+
/**
* Get the names of all functions in this catalog.
*
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index eee3991ad3..ac0367d43a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -96,6 +96,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
+import static org.apache.paimon.spark.SparkCatalogOptions.V1FUNCTION_ENABLED;
import static
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static
org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations;
@@ -137,9 +138,7 @@ public class SparkCatalog extends SparkBaseCatalog
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(),
DEFAULT_DATABASE.defaultValue());
this.v1FunctionEnabled =
- options.getBoolean(
- SparkCatalogOptions.V1FUNCTION_ENABLED.key(),
-
SparkCatalogOptions.V1FUNCTION_ENABLED.defaultValue())
+ options.getBoolean(V1FUNCTION_ENABLED.key(),
V1FUNCTION_ENABLED.defaultValue())
&& DelegateCatalog.rootCatalog(catalog) instanceof
RESTCatalog;
if (v1FunctionEnabled) {
this.v1FunctionRegistry = new
PaimonV1FunctionRegistry(sparkSession);
@@ -147,7 +146,7 @@ public class SparkCatalog extends SparkBaseCatalog
try {
catalog.getDatabase(defaultDatabase);
} catch (Catalog.DatabaseNotExistException e) {
- LOG.warn(
+ LOG.info(
"Default database '{}' does not exist, caused by: {},
start to create it",
defaultDatabase,
ExceptionUtils.stringifyException(e));
@@ -163,6 +162,8 @@ public class SparkCatalog extends SparkBaseCatalog
return catalog;
}
+ // ======================= database methods ===============================
+
@Override
public String[] defaultNamespace() {
return new String[] {defaultDatabase};
@@ -259,6 +260,22 @@ public class SparkCatalog extends SparkBaseCatalog
}
}
+ @Override
+ public void alterNamespace(String[] namespace, NamespaceChange... changes)
+ throws NoSuchNamespaceException {
+ checkNamespace(namespace);
+ try {
+ String databaseName = getDatabaseNameFromNamespace(namespace);
+ List<PropertyChange> propertyChanges =
+
Arrays.stream(changes).map(this::toPropertyChange).collect(Collectors.toList());
+ catalog.alterDatabase(databaseName, propertyChanges, false);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(namespace);
+ }
+ }
+
+ // ======================= table methods ===============================
+
@Override
public Identifier[] listTables(String[] namespace) throws
NoSuchNamespaceException {
checkNamespace(namespace);
@@ -499,111 +516,7 @@ public class SparkCatalog extends SparkBaseCatalog
}
}
- // --------------------- tools ------------------------------------------
-
- protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
- Identifier ident, Map<String, String> extraOptions) throws
NoSuchTableException {
- try {
- org.apache.paimon.table.Table paimonTable =
catalog.getTable(toIdentifier(ident));
- if (paimonTable instanceof FormatTable) {
- return convertToFileTable(ident, (FormatTable) paimonTable);
- } else {
- return new SparkTable(
- copyWithSQLConf(
- paimonTable, catalogName, toIdentifier(ident),
extraOptions));
- }
- } catch (Catalog.TableNotExistException e) {
- throw new NoSuchTableException(ident);
- }
- }
-
- private static FileTable convertToFileTable(Identifier ident, FormatTable
formatTable) {
- SparkSession spark = PaimonSparkSession$.MODULE$.active();
- StructType schema =
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
- StructType partitionSchema =
- SparkTypeUtils.fromPaimonRowType(
- TypeUtils.project(formatTable.rowType(),
formatTable.partitionKeys()));
- List<String> pathList = new ArrayList<>();
- pathList.add(formatTable.location());
- Options options = Options.fromMap(formatTable.options());
- CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(options.toMap());
- if (formatTable.format() == FormatTable.Format.CSV) {
- options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
- dsOptions = new CaseInsensitiveStringMap(options.toMap());
- return new PartitionedCSVTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- CSVFileFormat.class,
- partitionSchema);
- } else if (formatTable.format() == FormatTable.Format.ORC) {
- return new PartitionedOrcTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- OrcFileFormat.class,
- partitionSchema);
- } else if (formatTable.format() == FormatTable.Format.PARQUET) {
- return new PartitionedParquetTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- ParquetFileFormat.class,
- partitionSchema);
- } else if (formatTable.format() == FormatTable.Format.JSON) {
- return new PartitionedJsonTable(
- ident.name(),
- spark,
- dsOptions,
-
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
- scala.Option.apply(schema),
- JsonFileFormat.class,
- partitionSchema);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported format table "
- + ident.name()
- + " format "
- + formatTable.format().name());
- }
- }
-
- protected List<String> convertPartitionTransforms(Transform[] transforms) {
- List<String> partitionColNames = new ArrayList<>(transforms.length);
- for (Transform transform : transforms) {
- if (!(transform instanceof IdentityTransform)) {
- throw new UnsupportedOperationException(
- "Unsupported partition transform: " + transform);
- }
- NamedReference ref = ((IdentityTransform) transform).ref();
- if (!(ref instanceof FieldReference || ref.fieldNames().length !=
1)) {
- throw new UnsupportedOperationException(
- "Unsupported partition transform: " + transform);
- }
- partitionColNames.add(ref.fieldNames()[0]);
- }
- return partitionColNames;
- }
-
- @Override
- public void alterNamespace(String[] namespace, NamespaceChange... changes)
- throws NoSuchNamespaceException {
- checkNamespace(namespace);
- try {
- String databaseName = getDatabaseNameFromNamespace(namespace);
- List<PropertyChange> propertyChanges =
-
Arrays.stream(changes).map(this::toPropertyChange).collect(Collectors.toList());
- catalog.alterDatabase(databaseName, propertyChanges, false);
- } catch (Catalog.DatabaseNotExistException e) {
- throw new NoSuchNamespaceException(namespace);
- }
- }
+ // ======================= Function methods ===============================
@Override
public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
@@ -716,6 +629,98 @@ public class SparkCatalog extends SparkBaseCatalog
.dropFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent), ifExists);
}
+ // ======================= Tools methods ===============================
+
+ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
+ Identifier ident, Map<String, String> extraOptions) throws
NoSuchTableException {
+ try {
+ org.apache.paimon.table.Table paimonTable =
catalog.getTable(toIdentifier(ident));
+ if (paimonTable instanceof FormatTable) {
+ return convertToFileTable(ident, (FormatTable) paimonTable);
+ } else {
+ return new SparkTable(
+ copyWithSQLConf(
+ paimonTable, catalogName, toIdentifier(ident),
extraOptions));
+ }
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(ident);
+ }
+ }
+
+ private static FileTable convertToFileTable(Identifier ident, FormatTable
formatTable) {
+ SparkSession spark = PaimonSparkSession$.MODULE$.active();
+ StructType schema =
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
+ StructType partitionSchema =
+ SparkTypeUtils.fromPaimonRowType(
+ TypeUtils.project(formatTable.rowType(),
formatTable.partitionKeys()));
+ List<String> pathList = new ArrayList<>();
+ pathList.add(formatTable.location());
+ Options options = Options.fromMap(formatTable.options());
+ CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(options.toMap());
+ if (formatTable.format() == FormatTable.Format.CSV) {
+ options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
+ dsOptions = new CaseInsensitiveStringMap(options.toMap());
+ return new PartitionedCSVTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ CSVFileFormat.class,
+ partitionSchema);
+ } else if (formatTable.format() == FormatTable.Format.ORC) {
+ return new PartitionedOrcTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ OrcFileFormat.class,
+ partitionSchema);
+ } else if (formatTable.format() == FormatTable.Format.PARQUET) {
+ return new PartitionedParquetTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ ParquetFileFormat.class,
+ partitionSchema);
+ } else if (formatTable.format() == FormatTable.Format.JSON) {
+ return new PartitionedJsonTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ JsonFileFormat.class,
+ partitionSchema);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported format table "
+ + ident.name()
+ + " format "
+ + formatTable.format().name());
+ }
+ }
+
+ protected List<String> convertPartitionTransforms(Transform[] transforms) {
+ List<String> partitionColNames = new ArrayList<>(transforms.length);
+ for (Transform transform : transforms) {
+ if (!(transform instanceof IdentityTransform)) {
+ throw new UnsupportedOperationException(
+ "Unsupported partition transform: " + transform);
+ }
+ NamedReference ref = ((IdentityTransform) transform).ref();
+ if (!(ref instanceof FieldReference || ref.fieldNames().length !=
1)) {
+ throw new UnsupportedOperationException(
+ "Unsupported partition transform: " + transform);
+ }
+ partitionColNames.add(ref.fieldNames()[0]);
+ }
+ return partitionColNames;
+ }
+
private PropertyChange toPropertyChange(NamespaceChange change) {
if (change instanceof NamespaceChange.SetProperty) {
NamespaceChange.SetProperty set = (NamespaceChange.SetProperty)
change;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index e4563c492f..098b73a50b 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -93,6 +93,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
return this.sparkCatalog.paimonCatalog();
}
+ // ======================= database methods ===============================
+
@Override
public String[] defaultNamespace() {
return asNamespaceCatalog().defaultNamespace();
@@ -149,6 +151,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
return asNamespaceCatalog().dropNamespace(namespace, cascade);
}
+ // ======================= table methods ===============================
+
@Override
public Identifier[] listTables(String[] namespace) throws
NoSuchNamespaceException {
// delegate to the session catalog because all tables share the same
namespace
@@ -354,6 +358,8 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
return (FunctionCatalog) getDelegateCatalog();
}
+ // ======================= Function methods ===============================
+
@Override
public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
try {