This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch 590-CatalogSync
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/590-CatalogSync by this push:
new 6e09f5d9 Introduce catalogSyncClientImpl and
catalogConversionSourceImpl
6e09f5d9 is described below
commit 6e09f5d9c037501d010ed0be4f8376737f2534c2
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Dec 19 14:52:15 2024 -0800
Introduce catalogSyncClientImpl and catalogConversionSourceImpl
---
.../xtable/conversion/ExternalCatalogConfig.java | 25 +++++++---
.../xtable/catalog/CatalogConversionFactory.java | 9 ++--
.../catalog/ExternalCatalogConfigFactory.java | 9 ++--
.../xtable/conversion/ConversionController.java | 4 +-
.../catalog/TestCatalogConversionFactory.java | 20 ++++----
.../conversion/TestConversionController.java | 8 ++--
.../org/apache/xtable/testutil/ITTestUtils.java | 28 ++++++-----
.../apache/xtable/utilities/RunCatalogSync.java | 56 ++++------------------
.../src/test/resources/catalogConfig.yaml | 9 ++--
9 files changed, 81 insertions(+), 87 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
index 148cb2fb..6cacd907 100644
---
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
+++
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
@@ -26,13 +26,16 @@ import lombok.NonNull;
import lombok.Value;
/**
- * Defines the configuration for an external catalog, user needs to populate
at-least one of
- * catalogType or catalogImpl
+ * Defines the configuration for an external catalog, user needs to populate
at-least one of {@link
+ * ExternalCatalogConfig#catalogType} or {@link
ExternalCatalogConfig#catalogSyncClientImpl}
*/
@Value
@Builder
public class ExternalCatalogConfig {
- /** The name of the catalog, it also acts as a unique identifier for each
catalog */
+ /**
+ * A user-defined unique identifier for the catalog, allows user to sync
table to multiple
+ * catalogs of the same name/type eg: HMS catalog with url1, HMS catalog
with url2
+ */
@NonNull String catalogId;
/**
@@ -42,13 +45,21 @@ public class ExternalCatalogConfig {
String catalogType;
/**
- * (Optional) A fully qualified class name that implements the interfaces
for CatalogSyncClient,
- * it can be used if the implementation for catalogType doesn't exist in
XTable.
+ * (Optional) A fully qualified class name that implements the interface for
{@link
+ * org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the
implementation for
+ * catalogType doesn't exist in XTable.
+ */
+ String catalogSyncClientImpl;
+
+ /**
+ * (Optional) A fully qualified class name that implements the interface for
{@link
+ * org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used
if the implementation
+ * for catalogType doesn't exist in XTable.
*/
- String catalogImpl;
+ String catalogConversionSourceImpl;
/**
* The properties for each catalog, used for providing any custom behaviour
during catalog sync
*/
- @NonNull @Builder.Default Map<String, String> catalogOptions =
Collections.emptyMap();
+ @NonNull @Builder.Default Map<String, String> catalogProperties =
Collections.emptyMap();
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
index 2a4e5d2e..37778eb8 100644
---
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
@@ -42,7 +42,7 @@ public class CatalogConversionFactory {
public static CatalogConversionSource createCatalogConversionSource(
ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {
return ReflectionUtils.createInstanceOfClass(
- sourceCatalogConfig.getCatalogImpl(), sourceCatalogConfig,
configuration);
+ sourceCatalogConfig.getCatalogConversionSourceImpl(),
sourceCatalogConfig, configuration);
}
/**
@@ -53,8 +53,11 @@ public class CatalogConversionFactory {
* @param configuration hadoop configuration
*/
public CatalogSyncClient createCatalogSyncClient(
- ExternalCatalogConfig targetCatalogConfig, Configuration configuration) {
+ ExternalCatalogConfig targetCatalogConfig, String tableFormat,
Configuration configuration) {
return ReflectionUtils.createInstanceOfClass(
- targetCatalogConfig.getCatalogImpl(), targetCatalogConfig,
configuration);
+ targetCatalogConfig.getCatalogSyncClientImpl(),
+ targetCatalogConfig,
+ tableFormat,
+ configuration);
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java
index f053df32..3649ae8e 100644
---
a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java
@@ -28,11 +28,14 @@ public class ExternalCatalogConfigFactory {
public static ExternalCatalogConfig fromCatalogType(
String catalogType, String catalogId, Map<String, String> properties) {
// TODO: Choose existing implementation based on catalogType.
- String catalogImpl = "";
+ String catalogSyncClientImpl = "";
+ String catalogConversionSourceImpl = "";
return ExternalCatalogConfig.builder()
- .catalogImpl(catalogImpl)
+ .catalogType(catalogType)
+ .catalogSyncClientImpl(catalogSyncClientImpl)
+ .catalogConversionSourceImpl(catalogConversionSourceImpl)
.catalogId(catalogId)
- .catalogOptions(properties)
+ .catalogProperties(properties)
.build();
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index be0b7168..cd79ccb2 100644
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -142,7 +142,9 @@ public class ConversionController {
TargetCatalogConfig::getCatalogTableIdentifier,
targetCatalog ->
catalogConversionFactory.createCatalogSyncClient(
- targetCatalog.getCatalogConfig(), conf)));
+ targetCatalog.getCatalogConfig(),
+ targetTable.getFormatName(),
+ conf)));
catalogSyncResults.put(
targetTable.getFormatName(),
syncCatalogsForTargetTable(
diff --git
a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
index 386ecd64..53e68923 100644
---
a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
+++
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
@@ -30,7 +30,8 @@ import org.apache.xtable.conversion.TargetCatalogConfig;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import org.apache.xtable.spi.sync.CatalogSyncClient;
-import org.apache.xtable.testutil.ITTestUtils.TestCatalogImpl;
+import org.apache.xtable.testutil.ITTestUtils.TestCatalogConversionSourceImpl;
+import org.apache.xtable.testutil.ITTestUtils.TestCatalogSyncImpl;
class TestCatalogConversionFactory {
@@ -39,12 +40,14 @@ class TestCatalogConversionFactory {
ExternalCatalogConfig sourceCatalog =
ExternalCatalogConfig.builder()
.catalogId("catalogId")
- .catalogImpl(TestCatalogImpl.class.getName())
- .catalogOptions(Collections.emptyMap())
+
.catalogConversionSourceImpl(TestCatalogConversionSourceImpl.class.getName())
+ .catalogProperties(Collections.emptyMap())
.build();
CatalogConversionSource catalogConversionSource =
CatalogConversionFactory.createCatalogConversionSource(sourceCatalog,
new Configuration());
- assertEquals(catalogConversionSource.getClass().getName(),
TestCatalogImpl.class.getName());
+ assertEquals(
+ catalogConversionSource.getClass().getName(),
+ TestCatalogConversionSourceImpl.class.getName());
}
@Test
@@ -54,8 +57,8 @@ class TestCatalogConversionFactory {
.catalogConfig(
ExternalCatalogConfig.builder()
.catalogId("catalogId")
- .catalogImpl(TestCatalogImpl.class.getName())
- .catalogOptions(Collections.emptyMap())
+ .catalogSyncClientImpl(TestCatalogSyncImpl.class.getName())
+ .catalogProperties(Collections.emptyMap())
.build())
.catalogTableIdentifier(
CatalogTableIdentifier.builder()
@@ -65,7 +68,8 @@ class TestCatalogConversionFactory {
.build();
CatalogSyncClient catalogSyncClient =
CatalogConversionFactory.getInstance()
- .createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(),
new Configuration());
- assertEquals(catalogSyncClient.getClass().getName(),
TestCatalogImpl.class.getName());
+ .createCatalogSyncClient(
+ targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new
Configuration());
+ assertEquals(catalogSyncClient.getClass().getName(),
TestCatalogSyncImpl.class.getName());
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index c1fb9863..0600c5ee 100644
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -459,10 +459,10 @@ public class TestConversionController {
.thenReturn(tableFormatSyncResults);
// Mocks for catalogSync.
when(mockCatalogConversionFactory.createCatalogSyncClient(
- targetCatalogs.get(0).getCatalogConfig(), mockConf))
+ eq(targetCatalogs.get(0).getCatalogConfig()), any(), eq(mockConf)))
.thenReturn(mockCatalogSyncClient1);
when(mockCatalogConversionFactory.createCatalogSyncClient(
- targetCatalogs.get(1).getCatalogConfig(), mockConf))
+ eq(targetCatalogs.get(1).getCatalogConfig()), any(), eq(mockConf)))
.thenReturn(mockCatalogSyncClient2);
when(catalogSync.syncTable(
eq(
@@ -590,8 +590,8 @@ public class TestConversionController {
.catalogConfig(
ExternalCatalogConfig.builder()
.catalogId("catalogId-" + suffix)
- .catalogImpl("catalogImpl-" + suffix)
- .catalogOptions(Collections.emptyMap())
+ .catalogSyncClientImpl("catalogImpl-" + suffix)
+ .catalogProperties(Collections.emptyMap())
.build())
.catalogTableIdentifier(
CatalogTableIdentifier.builder()
diff --git
a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
index 9a4f18ad..04be989d 100644
--- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
+++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
@@ -51,18 +51,10 @@ public class ITTestUtils {
Assertions.assertEquals(partitioningFields,
internalTable.getPartitioningFields());
}
- public static class TestCatalogImpl implements CatalogConversionSource,
CatalogSyncClient {
+ public static class TestCatalogSyncImpl implements CatalogSyncClient {
- public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration
hadoopConf) {}
-
- @Override
- public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
- return SourceTable.builder()
- .name("source_table_name")
- .basePath("file://base_path/v1/")
- .formatName("ICEBERG")
- .build();
- }
+ public TestCatalogSyncImpl(
+ ExternalCatalogConfig catalogConfig, String tableFormat, Configuration
hadoopConf) {}
@Override
public String getCatalogId() {
@@ -103,4 +95,18 @@ public class ITTestUtils {
@Override
public void close() throws Exception {}
}
+
+ public static class TestCatalogConversionSourceImpl implements
CatalogConversionSource {
+ public TestCatalogConversionSourceImpl(
+ ExternalCatalogConfig sourceCatalogConfig, Configuration
configuration) {}
+
+ @Override
+ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
+ return SourceTable.builder()
+ .name("source_table_name")
+ .basePath("file://base_path/v1/")
+ .formatName("ICEBERG")
+ .build();
+ }
+ }
}
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index b77ebd09..a423fbc2 100644
---
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -42,7 +42,6 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -50,7 +49,6 @@ import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.xtable.catalog.CatalogConversionFactory;
-import org.apache.xtable.catalog.ExternalCatalogConfigFactory;
import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
@@ -132,12 +130,12 @@ public class RunCatalogSync {
RunSync.TableFormatConverters tableFormatConverters =
loadTableFormatConversionConfigs(customConfig);
- Map<String, DatasetConfig.Catalog> catalogsByName =
+ Map<String, ExternalCatalogConfig> catalogsById =
datasetConfig.getTargetCatalogs().stream()
- .collect(Collectors.toMap(DatasetConfig.Catalog::getCatalogId,
Function.identity()));
- ExternalCatalogConfig sourceCatalogConfig =
getCatalogConfig(datasetConfig.getSourceCatalog());
+ .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId,
Function.identity()));
CatalogConversionSource catalogConversionSource =
-
CatalogConversionFactory.createCatalogConversionSource(sourceCatalogConfig,
hadoopConf);
+ CatalogConversionFactory.createCatalogConversionSource(
+ datasetConfig.getSourceCatalog(), hadoopConf);
ConversionController conversionController = new
ConversionController(hadoopConf);
for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) {
SourceTable sourceTable = null;
@@ -181,9 +179,7 @@ public class RunCatalogSync {
TargetCatalogConfig.builder()
.catalogTableIdentifier(
targetCatalogTableIdentifier.getCatalogTableIdentifier())
- .catalogConfig(
- getCatalogConfig(
-
catalogsByName.get(targetCatalogTableIdentifier.getCatalogId())))
+
.catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId()))
.build());
}
ConversionConfig conversionConfig =
@@ -208,19 +204,6 @@ public class RunCatalogSync {
}
}
- static ExternalCatalogConfig getCatalogConfig(DatasetConfig.Catalog catalog)
{
- if (!StringUtils.isEmpty(catalog.getCatalogType())) {
- return ExternalCatalogConfigFactory.fromCatalogType(
- catalog.getCatalogType(), catalog.getCatalogId(),
catalog.getCatalogProperties());
- } else {
- return ExternalCatalogConfig.builder()
- .catalogId(catalog.getCatalogId())
- .catalogImpl(catalog.getCatalogImpl())
- .catalogOptions(catalog.getCatalogProperties())
- .build();
- }
- }
-
static Map<String, ConversionSourceProvider> getConversionSourceProviders(
List<String> tableFormats,
RunSync.TableFormatConverters tableFormatConverters,
@@ -252,36 +235,17 @@ public class RunCatalogSync {
* Configuration of the source catalog from which XTable will read. It
must contain all the
* necessary connection and access details for describing and listing
tables
*/
- private Catalog sourceCatalog;
+ private ExternalCatalogConfig sourceCatalog;
/**
* Defines configuration one or more target catalogs, to which XTable will
write or update
* tables. Unlike the source, these catalogs must be writable
*/
- private List<Catalog> targetCatalogs;
+ private List<ExternalCatalogConfig> targetCatalogs;
/** A list of datasets that specify how a source table maps to one or more
target tables. */
private List<Dataset> datasets;
/** Configuration for catalog. */
- @Data
- public static class Catalog {
- /** A user defined unique identifier for the catalog. */
- private String catalogId;
- /**
- * The type of the source catalog. This might be a specific type
understood by XTable, such as
- * Hive, Glue etc.
- */
- private String catalogType;
- /**
- * (Optional) A fully qualified class name that implements the
interfaces for
- * CatalogSyncClient, it can be used if the implementation for
catalogType doesn't exist in
- * XTable. This is an optional field.
- */
- private String catalogImpl;
- /**
- * A collection of configs used to configure access or connection
properties for the catalog.
- */
- private Map<String, String> catalogProperties;
- }
+ ExternalCatalogConfig catalogConfig;
@Data
public static class Dataset {
@@ -306,8 +270,8 @@ public class RunCatalogSync {
@Data
public static class TargetTableIdentifier {
/**
- * The user defined unique identifier of the target {@link Catalog}
where the table will be
- * created or updated
+ * The user defined unique identifier of the target catalog where the
table will be created or
+ * updated
*/
String catalogId;
/**
diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml
b/xtable-utilities/src/test/resources/catalogConfig.yaml
index 185f47b3..84773235 100644
--- a/xtable-utilities/src/test/resources/catalogConfig.yaml
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -16,26 +16,27 @@
#
sourceCatalog:
catalogId: "source-1"
- catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogConversionSourceImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key01: "value1"
key02: "value2"
key03: "value3"
targetCatalogs:
- catalogId: "target-1"
- catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key11: "value1"
key12: "value2"
key13: "value3"
- catalogId: "target-2"
- catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key21: "value1"
key22: "value2"
key23: "value3"
- catalogId: "target-3"
- catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
catalogProperties:
key31: "value1"
key32: "value2"