This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d928e8b113 [Improve] Refactor S3FileCatalog and it's factory (#7457)
d928e8b113 is described below
commit d928e8b11337710da82c06fb97aa135d5365fc33
Author: Dongyeon Lee <[email protected]>
AuthorDate: Fri Aug 23 11:39:51 2024 +0900
[Improve] Refactor S3FileCatalog and it's factory (#7457)
---
.../file/catalog/AbstractFileCatalog.java | 11 +-
.../seatunnel/file/s3/catalog/S3FileCatalog.java | 118 ++-------------------
.../file/s3/catalog/S3FileCatalogFactory.java | 3 +-
3 files changed, 18 insertions(+), 114 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
index f7a1b46a8b..9bac9062aa 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/catalog/AbstractFileCatalog.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import lombok.SneakyThrows;
+import java.io.IOException;
import java.util.List;
public abstract class AbstractFileCatalog implements Catalog {
@@ -51,7 +52,15 @@ public abstract class AbstractFileCatalog implements Catalog
{
public void open() throws CatalogException {}
@Override
- public void close() throws CatalogException {}
+ public void close() throws CatalogException {
+ if (hadoopFileSystemProxy != null) {
+ try {
+ hadoopFileSystemProxy.close();
+ } catch (IOException e) {
+ throw new CatalogException(e);
+ }
+ }
+ }
@Override
public String name() {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
index 0f48a2c4ae..7ed0a78c21 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalog.java
@@ -17,120 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.catalog.Catalog;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
-import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
-import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
-import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import
org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
-import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.hadoop.fs.LocatedFileStatus;
+public class S3FileCatalog extends AbstractFileCatalog {
+ // TODO: this catalog name conflict with a factory identifier
+ public static final String CATALOG_NAME = "S3File";
-import lombok.AllArgsConstructor;
-import lombok.SneakyThrows;
-
-import java.io.IOException;
-import java.util.List;
-
-@AllArgsConstructor
-public class S3FileCatalog implements Catalog {
-
- private final HadoopFileSystemProxy hadoopFileSystemProxy;
- private final ReadonlyConfig readonlyConfig;
-
- @Override
- public void open() throws CatalogException {}
-
- @Override
- public void close() throws CatalogException {
- if (hadoopFileSystemProxy != null) {
- try {
- hadoopFileSystemProxy.close();
- } catch (IOException e) {
- throw new CatalogException(e);
- }
- }
- }
-
- @Override
- public String name() {
- return "S3File";
- }
-
- @Override
- public String getDefaultDatabase() throws CatalogException {
- return null;
- }
-
- @Override
- public boolean databaseExists(String databaseName) throws CatalogException
{
- return false;
- }
-
- @Override
- public List<String> listDatabases() throws CatalogException {
- return null;
- }
-
- @Override
- public List<String> listTables(String databaseName)
- throws CatalogException, DatabaseNotExistException {
- return null;
- }
-
- @SneakyThrows
- @Override
- public boolean tableExists(TablePath tablePath) throws CatalogException {
- return
hadoopFileSystemProxy.fileExist(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
- }
-
- @Override
- public CatalogTable getTable(TablePath tablePath)
- throws CatalogException, TableNotExistException {
- return null;
- }
-
- @SneakyThrows
- @Override
- public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
- throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
-
hadoopFileSystemProxy.createDir(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
- }
-
- @SneakyThrows
- @Override
- public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
-
hadoopFileSystemProxy.deleteFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
- }
-
- @Override
- public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException, CatalogException {}
-
- @Override
- public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
- throws DatabaseNotExistException, CatalogException {}
-
- @SneakyThrows
- @Override
- public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
-
hadoopFileSystemProxy.deleteFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
-
hadoopFileSystemProxy.createDir(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
- }
-
- @SneakyThrows
- @Override
- public boolean isExistsData(TablePath tablePath) {
- final List<LocatedFileStatus> locatedFileStatuses =
-
hadoopFileSystemProxy.listFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
- return CollectionUtils.isNotEmpty(locatedFileStatuses);
+ public S3FileCatalog(HadoopFileSystemProxy hadoopFileSystemProxy, String
filePath) {
+ super(hadoopFileSystemProxy, filePath, CATALOG_NAME);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalogFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalogFactory.java
index 53f350e10c..2747fac0c6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3FileCatalogFactory.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
@@ -34,7 +35,7 @@ public class S3FileCatalogFactory implements CatalogFactory {
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
HadoopConf hadoopConf = S3HadoopConf.buildWithReadOnlyConfig(options);
HadoopFileSystemProxy fileSystemUtils = new
HadoopFileSystemProxy(hadoopConf);
- return new S3FileCatalog(fileSystemUtils, options);
+ return new S3FileCatalog(fileSystemUtils,
options.get(BaseSourceConfigOptions.FILE_PATH));
}
@Override