This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fa68a1e91 [core] Move common code from FileSystemCatalog and
HiveCatalog into AbstractCatalog (#2124)
fa68a1e91 is described below
commit fa68a1e91ee77aeef0c0b99160cc093e56d2405a
Author: tsreaper <[email protected]>
AuthorDate: Fri Oct 13 09:53:07 2023 +0800
[core] Move common code from FileSystemCatalog and HiveCatalog into
AbstractCatalog (#2124)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 155 +++++++++++++++++++-
.../apache/paimon/catalog/FileSystemCatalog.java | 116 +++------------
.../java/org/apache/paimon/hive/HiveCatalog.java | 157 ++++++---------------
3 files changed, 213 insertions(+), 215 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 29cd15747..0c12b6131 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -26,6 +26,8 @@ import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
@@ -84,6 +86,149 @@ public abstract class AbstractCatalog implements Catalog {
options.get(key)));
}
+ @Override
+ public boolean databaseExists(String databaseName) {
+ if (isSystemDatabase(databaseName)) {
+ return true;
+ }
+
+ return databaseExistsImpl(databaseName);
+ }
+
+ protected abstract boolean databaseExistsImpl(String databaseName);
+
+ @Override
+ public void createDatabase(String name, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException {
+ if (isSystemDatabase(name)) {
+ throw new ProcessSystemDatabaseException();
+ }
+ if (databaseExists(name)) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new DatabaseAlreadyExistException(name);
+ }
+
+ createDatabaseImpl(name);
+ }
+
+ protected abstract void createDatabaseImpl(String name);
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException {
+ if (isSystemDatabase(name)) {
+ throw new ProcessSystemDatabaseException();
+ }
+ if (!databaseExists(name)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new DatabaseNotExistException(name);
+ }
+
+ if (!cascade && listTables(name).size() > 0) {
+ throw new DatabaseNotEmptyException(name);
+ }
+
+ dropDatabaseImpl(name);
+ }
+
+ protected abstract void dropDatabaseImpl(String name);
+
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
+ if (isSystemDatabase(databaseName)) {
+ return GLOBAL_TABLES;
+ }
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(databaseName);
+ }
+
+ return listTablesImpl(databaseName);
+ }
+
+ protected abstract List<String> listTablesImpl(String databaseName);
+
+ @Override
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ checkNotSystemTable(identifier, "dropTable");
+ if (!tableExists(identifier)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableNotExistException(identifier);
+ }
+
+ dropTableImpl(identifier);
+ }
+
+ protected abstract void dropTableImpl(Identifier identifier);
+
+ @Override
+ public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ checkNotSystemTable(identifier, "createTable");
+ if (!databaseExists(identifier.getDatabaseName())) {
+ throw new DatabaseNotExistException(identifier.getDatabaseName());
+ }
+
+ if (tableExists(identifier)) {
+ if (ignoreIfExists) {
+ return;
+ }
+ throw new TableAlreadyExistException(identifier);
+ }
+
+ copyTableDefaultOptions(schema.options());
+
+ createTableImpl(identifier, schema);
+ }
+
+ protected abstract void createTableImpl(Identifier identifier, Schema
schema);
+
+ @Override
+ public void renameTable(Identifier fromTable, Identifier toTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException {
+ checkNotSystemTable(fromTable, "renameTable");
+ checkNotSystemTable(toTable, "renameTable");
+
+ if (!tableExists(fromTable)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableNotExistException(fromTable);
+ }
+
+ if (tableExists(toTable)) {
+ throw new TableAlreadyExistException(toTable);
+ }
+
+ renameTableImpl(fromTable, toTable);
+ }
+
+ protected abstract void renameTableImpl(Identifier fromTable, Identifier
toTable);
+
+ @Override
+ public void alterTable(
+ Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ checkNotSystemTable(identifier, "alterTable");
+ if (!tableExists(identifier)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableNotExistException(identifier);
+ }
+
+ alterTableImpl(identifier, changes);
+ }
+
+ protected abstract void alterTableImpl(Identifier identifier,
List<SchemaChange> changes)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
+
@Nullable
private LineageMeta findAndCreateLineageMeta(Options options, ClassLoader
classLoader) {
return options.getOptional(LINEAGE_META)
@@ -175,8 +320,12 @@ public abstract class AbstractCatalog implements Catalog {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER);
}
- protected void checkNotSystemTable(Identifier identifier, String method) {
- if (isSystemDatabase(identifier.getDatabaseName()) ||
isSpecifiedSystemTable(identifier)) {
+ protected boolean isSystemTable(Identifier identifier) {
+ return isSystemDatabase(identifier.getDatabaseName()) ||
isSpecifiedSystemTable(identifier);
+ }
+
+ private void checkNotSystemTable(Identifier identifier, String method) {
+ if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for system table '%s', please use
data table.",
@@ -213,7 +362,7 @@ public abstract class AbstractCatalog implements Catalog {
return new Path(warehouse, database + DB_SUFFIX);
}
- protected boolean isSystemDatabase(String database) {
+ private boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index a9dfb6e29..e2e88ce0c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -65,58 +65,22 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public boolean databaseExists(String databaseName) {
- if (isSystemDatabase(databaseName)) {
- return true;
- }
+ protected boolean databaseExistsImpl(String databaseName) {
return uncheck(() -> fileIO.exists(databasePath(databaseName)));
}
@Override
- public void createDatabase(String name, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException {
- if (isSystemDatabase(name)) {
- throw new ProcessSystemDatabaseException();
- }
- if (databaseExists(name)) {
- if (ignoreIfExists) {
- return;
- }
- throw new DatabaseAlreadyExistException(name);
- }
+ protected void createDatabaseImpl(String name) {
uncheck(() -> fileIO.mkdirs(databasePath(name)));
}
@Override
- public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
- throws DatabaseNotExistException, DatabaseNotEmptyException {
- if (isSystemDatabase(name)) {
- throw new ProcessSystemDatabaseException();
- }
- if (!databaseExists(name)) {
- if (ignoreIfNotExists) {
- return;
- }
-
- throw new DatabaseNotExistException(name);
- }
-
- if (!cascade && listTables(name).size() > 0) {
- throw new DatabaseNotEmptyException(name);
- }
-
+ protected void dropDatabaseImpl(String name) {
uncheck(() -> fileIO.delete(databasePath(name), true));
}
@Override
- public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
- if (isSystemDatabase(databaseName)) {
- return GLOBAL_TABLES;
- }
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(databaseName);
- }
-
+ protected List<String> listTablesImpl(String databaseName) {
List<String> tables = new ArrayList<>();
for (FileStatus status : uncheck(() ->
fileIO.listStatus(databasePath(databaseName)))) {
if (status.isDir() && tableExists(status.getPath())) {
@@ -127,11 +91,12 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
- Path path = getDataTableLocation(identifier);
- return new SchemaManager(fileIO, path)
- .latest()
- .orElseThrow(() -> new TableNotExistException(identifier));
+ public boolean tableExists(Identifier identifier) {
+ if (isSystemTable(identifier)) {
+ return super.tableExists(identifier);
+ }
+
+ return tableExists(getDataTableLocation(identifier));
}
private boolean tableExists(Path tablePath) {
@@ -139,74 +104,35 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
- throws TableNotExistException {
- checkNotSystemTable(identifier, "dropTable");
+ public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
Path path = getDataTableLocation(identifier);
- if (!tableExists(path)) {
- if (ignoreIfNotExists) {
- return;
- }
-
- throw new TableNotExistException(identifier);
- }
+ return new SchemaManager(fileIO, path)
+ .latest()
+ .orElseThrow(() -> new TableNotExistException(identifier));
+ }
+ @Override
+ protected void dropTableImpl(Identifier identifier) {
+ Path path = getDataTableLocation(identifier);
uncheck(() -> fileIO.delete(path, true));
}
@Override
- public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
- throws TableAlreadyExistException, DatabaseNotExistException {
- checkNotSystemTable(identifier, "createTable");
- if (!databaseExists(identifier.getDatabaseName())) {
- throw new DatabaseNotExistException(identifier.getDatabaseName());
- }
-
+ public void createTableImpl(Identifier identifier, Schema schema) {
Path path = getDataTableLocation(identifier);
- if (tableExists(path)) {
- if (ignoreIfExists) {
- return;
- }
-
- throw new TableAlreadyExistException(identifier);
- }
-
- copyTableDefaultOptions(schema.options());
-
uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
}
@Override
- public void renameTable(Identifier fromTable, Identifier toTable, boolean
ignoreIfNotExists)
- throws TableNotExistException, TableAlreadyExistException {
- checkNotSystemTable(fromTable, "renameTable");
- checkNotSystemTable(toTable, "renameTable");
+ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
Path fromPath = getDataTableLocation(fromTable);
- if (!tableExists(fromPath)) {
- if (ignoreIfNotExists) {
- return;
- }
-
- throw new TableNotExistException(fromTable);
- }
-
Path toPath = getDataTableLocation(toTable);
- if (tableExists(toPath)) {
- throw new TableAlreadyExistException(toTable);
- }
-
uncheck(() -> fileIO.rename(fromPath, toPath));
}
@Override
- public void alterTable(
- Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- checkNotSystemTable(identifier, "alterTable");
- if (!tableExists(getDataTableLocation(identifier))) {
- throw new TableNotExistException(identifier);
- }
-
new SchemaManager(fileIO,
getDataTableLocation(identifier)).commitChanges(changes);
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 8893960ea..59aaebe65 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -42,14 +42,12 @@ import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -171,10 +169,7 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public boolean databaseExists(String databaseName) {
- if (isSystemDatabase(databaseName)) {
- return true;
- }
+ protected boolean databaseExistsImpl(String databaseName) {
try {
client.getDatabase(databaseName);
return true;
@@ -187,93 +182,81 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public void createDatabase(String name, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException {
- if (isSystemDatabase(name)) {
- throw new ProcessSystemDatabaseException();
- }
+ protected void createDatabaseImpl(String name) {
try {
client.createDatabase(convertToDatabase(name));
-
locationHelper.createPathIfRequired(databasePath(name), fileIO);
- } catch (AlreadyExistsException e) {
- if (!ignoreIfExists) {
- throw new DatabaseAlreadyExistException(name, e);
- }
} catch (TException | IOException e) {
throw new RuntimeException("Failed to create database " + name, e);
}
}
@Override
- public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
- throws DatabaseNotExistException, DatabaseNotEmptyException {
- if (isSystemDatabase(name)) {
- throw new ProcessSystemDatabaseException();
- }
+ protected void dropDatabaseImpl(String name) {
try {
- if (!cascade && client.getAllTables(name).size() > 0) {
- throw new DatabaseNotEmptyException(name);
- }
-
locationHelper.dropPathIfRequired(databasePath(name), fileIO);
client.dropDatabase(name, true, false, true);
- } catch (NoSuchObjectException | UnknownDBException e) {
- if (!ignoreIfNotExists) {
- throw new DatabaseNotExistException(name, e);
- }
} catch (TException | IOException e) {
throw new RuntimeException("Failed to drop database " + name, e);
}
}
@Override
- public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
- if (isSystemDatabase(databaseName)) {
- return GLOBAL_TABLES;
- }
+ protected List<String> listTablesImpl(String databaseName) {
try {
return client.getAllTables(databaseName).stream()
.filter(
tableName -> {
Identifier identifier = new
Identifier(databaseName, tableName);
// the environment here may not be able to
access non-paimon
- // tables.
- // so we just check the schema file first
- return schemaFileExists(identifier)
- && paimonTableExists(identifier);
+ // tables, so we just check the schema file
first
+ return schemaFileExists(identifier) &&
tableExists(identifier);
})
.collect(Collectors.toList());
- } catch (UnknownDBException e) {
- throw new DatabaseNotExistException(databaseName, e);
} catch (TException e) {
throw new RuntimeException("Failed to list all tables in database
" + databaseName, e);
}
}
+ @Override
+ public boolean tableExists(Identifier identifier) {
+ if (isSystemTable(identifier)) {
+ return super.tableExists(identifier);
+ }
+
+ Table table;
+ try {
+ table = client.getTable(identifier.getDatabaseName(),
identifier.getObjectName());
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new RuntimeException(
+ "Cannot determine if table " + identifier.getFullName() +
" is a paimon table.",
+ e);
+ }
+
+ return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table);
+ }
+
+ private static boolean isPaimonTable(Table table) {
+ return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
+ &&
OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat());
+ }
+
@Override
public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
- if (!paimonTableExists(identifier)) {
+ if (!tableExists(identifier)) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
return new SchemaManager(fileIO, tableLocation)
.latest()
- .orElseThrow(() -> new RuntimeException("There is no paimond
in " + tableLocation));
+ .orElseThrow(
+ () -> new RuntimeException("There is no paimon table
in " + tableLocation));
}
@Override
- public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
- throws TableNotExistException {
- checkNotSystemTable(identifier, "dropTable");
- if (!paimonTableExists(identifier)) {
- if (ignoreIfNotExists) {
- return;
- } else {
- throw new TableNotExistException(identifier);
- }
- }
-
+ protected void dropTableImpl(Identifier identifier) {
try {
client.dropTable(
identifier.getDatabaseName(), identifier.getObjectName(),
true, false, true);
@@ -294,27 +277,11 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
- throws TableAlreadyExistException, DatabaseNotExistException {
- checkNotSystemTable(identifier, "createTable");
- String databaseName = identifier.getDatabaseName();
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(databaseName);
- }
- if (paimonTableExists(identifier)) {
- if (ignoreIfExists) {
- return;
- } else {
- throw new TableAlreadyExistException(identifier);
- }
- }
-
+ protected void createTableImpl(Identifier identifier, Schema schema) {
checkFieldNamesUpperCase(schema.rowType().getFieldNames());
+
// first commit changes to underlying files
// if changes on Hive fails there is no harm to perform the same
changes to files again
-
- copyTableDefaultOptions(schema.options());
-
TableSchema tableSchema;
try {
tableSchema = schemaManager(identifier).createTable(schema);
@@ -325,6 +292,7 @@ public class HiveCatalog extends AbstractCatalog {
+ " to underlying files.",
e);
}
+
Table table =
newHmsTable(
identifier,
@@ -351,22 +319,7 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public void renameTable(Identifier fromTable, Identifier toTable, boolean
ignoreIfNotExists)
- throws TableNotExistException, TableAlreadyExistException {
- checkNotSystemTable(fromTable, "renameTable");
- checkNotSystemTable(toTable, "renameTable");
- if (!paimonTableExists(fromTable)) {
- if (ignoreIfNotExists) {
- return;
- } else {
- throw new TableNotExistException(fromTable);
- }
- }
-
- if (paimonTableExists(toTable)) {
- throw new TableAlreadyExistException(toTable);
- }
-
+ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
checkIdentifierUpperCase(toTable);
String fromDB = fromTable.getDatabaseName();
@@ -401,18 +354,8 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public void alterTable(
- Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- checkNotSystemTable(identifier, "alterTable");
- if (!paimonTableExists(identifier)) {
- if (ignoreIfNotExists) {
- return;
- } else {
- throw new TableNotExistException(identifier);
- }
- }
-
checkFieldNamesUpperCaseInSchemaChange(changes);
final SchemaManager schemaManager = schemaManager(identifier);
@@ -579,26 +522,6 @@ public class HiveCatalog extends AbstractCatalog {
return new SchemaManager(fileIO,
getDataTableLocation(identifier)).latest().isPresent();
}
- private boolean paimonTableExists(Identifier identifier) {
- Table table;
- try {
- table = client.getTable(identifier.getDatabaseName(),
identifier.getObjectName());
- } catch (NoSuchObjectException e) {
- return false;
- } catch (TException e) {
- throw new RuntimeException(
- "Cannot determine if table " + identifier.getFullName() +
" is a paimon table.",
- e);
- }
-
- return isPaimonTable(table) || LegacyHiveClasses.isPaimonTable(table);
- }
-
- private static boolean isPaimonTable(Table table) {
- return INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
- &&
OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat());
- }
-
private SchemaManager schemaManager(Identifier identifier) {
checkIdentifierUpperCase(identifier);
return new SchemaManager(fileIO, getDataTableLocation(identifier))