This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch 590-CatalogSync in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit f000fa16227ca2520327ee4aade8ea4dac9f1830 Author: Vinish Reddy <[email protected]> AuthorDate: Thu Dec 19 00:50:24 2024 -0800 Address comments --- .../apache/xtable/conversion/ConversionConfig.java | 6 +- .../org/apache/xtable/conversion/TargetTable.java | 4 -- xtable-core/pom.xml | 20 +++++++ .../xtable/conversion/ConversionController.java | 2 +- .../catalog/TestCatalogConversionFactory.java | 53 +---------------- .../conversion/TestConversionController.java | 3 +- .../org/apache/xtable/testutil/ITTestUtils.java | 59 ++++++++++++++++++ xtable-utilities/pom.xml | 9 +++ .../apache/xtable/utilities/RunCatalogSync.java | 69 +++++++++++++++++----- .../xtable/utilities/TestRunCatalogSync.java | 61 ------------------- .../src/test/resources/catalogConfig.yaml | 8 +-- 11 files changed, 154 insertions(+), 140 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 8ef52741..8d1e7697 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -36,8 +36,8 @@ public class ConversionConfig { // One or more targets to sync the table metadata to List<TargetTable> targetTables; // Each target table can be synced to multiple target catalogs, this is map from - // targetTableIdentifier to target catalogs. - Map<String, List<TargetCatalogConfig>> targetCatalogs; + // targetTable to target catalogs. + Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs; // The mode, incremental or snapshot SyncMode syncMode; @@ -45,7 +45,7 @@ public class ConversionConfig { ConversionConfig( @NonNull SourceTable sourceTable, List<TargetTable> targetTables, - Map<String, List<TargetCatalogConfig>> targetCatalogs, + Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs, SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java index 7f503b75..6256da2c 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -44,8 +44,4 @@ public class TargetTable extends ExternalTable { this.metadataRetention = metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; } - - public String getId() { - return String.format("%s#%s", sanitizeBasePath(this.basePath), formatName); - } } diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e..80de2299 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -174,4 +174,24 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + <configuration> + <skip>false</skip> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index df72ff5c..be0b7168 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -136,7 +136,7 @@ public class ConversionController { Map<String, SyncResult> catalogSyncResults = new HashMap<>(); for (TargetTable targetTable : config.getTargetTables()) { Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients = - config.getTargetCatalogs().get(targetTable.getId()).stream() + config.getTargetCatalogs().get(targetTable).stream() .collect( Collectors.toMap( TargetCatalogConfig::getCatalogTableIdentifier, diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java index 5fd6523f..6a6a5592 100644 --- a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java @@ -26,12 +26,11 @@ import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; import org.apache.xtable.conversion.ExternalCatalogConfig; -import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.conversion.TargetCatalogConfig; -import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.spi.extractor.CatalogConversionSource; import org.apache.xtable.spi.sync.CatalogSyncClient; +import org.apache.xtable.testutil.ITTestUtils.TestCatalogImpl; class TestCatalogConversionFactory { @@ -69,54 +68,4 @@ class TestCatalogConversionFactory { .createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), new Configuration()); assertEquals(catalogSyncClient.getClass().getName(), TestCatalogImpl.class.getName()); } - - public static class TestCatalogImpl - implements CatalogSyncClient<Object>, CatalogConversionSource { - - public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} - - @Override - public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { - return null; - } - - @Override - public String getCatalogName() { - return null; - } - - @Override - public String getStorageDescriptorLocation(Object o) { - return null; - } - - @Override - public boolean hasDatabase(String databaseName) { - return false; - } - - @Override - public void createDatabase(String databaseName) {} - - @Override - public Object getTable(CatalogTableIdentifier tableIdentifier) { - return null; - } - - @Override - public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void refreshTable( - InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void close() throws Exception {} - } } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index f7635d91..11dee760 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -40,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -579,7 +580,7 @@ public class TestConversionController { .targetTables(targetTables) .targetCatalogs( targetTables.stream() - .collect(Collectors.toMap(TargetTable::getId, k -> targetCatalogs))) + .collect(Collectors.toMap(Function.identity(), k -> targetCatalogs))) .syncMode(syncMode) .build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java index 281e61fe..4a113272 100644 --- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java +++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java @@ -20,12 +20,18 @@ package org.apache.xtable.testutil; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Assertions; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; public class ITTestUtils { @@ -44,4 +50,57 @@ public class ITTestUtils { Assertions.assertEquals(basePath, internalTable.getBasePath()); Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields()); } + + public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient { + + public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + return SourceTable.builder() + .name("source_table_name") + .basePath("file://base_path/v1/") + .formatName("ICEBERG") + .build(); + } + + @Override + public String getCatalogName() { + return null; + } + + @Override + public String getStorageLocation(Object o) { + return null; + } + + @Override + public boolean hasDatabase(String databaseName) { + return false; + } + + @Override + public void createDatabase(String databaseName) {} + + @Override + public Object getTable(CatalogTableIdentifier tableIdentifier) { + return null; + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void refreshTable( + InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void close() throws Exception {} + } } diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml index 8191af3c..25d55973 100644 --- a/xtable-utilities/pom.xml +++ b/xtable-utilities/pom.xml @@ -35,6 +35,15 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.xtable</groupId> + <artifactId>xtable-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- command line arg parsing --> <dependency> <groupId>commons-cli</groupId> diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index f69d75f5..28e26dd9 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -161,7 +161,7 @@ public class RunCatalogSync { dataset.getSourceCatalogTableIdentifier().getCatalogTableIdentifier()); } List<TargetTable> targetTables = new ArrayList<>(); - Map<String, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>(); + Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>(); for (TargetTableIdentifier targetCatalogTableIdentifier : dataset.getTargetCatalogTableIdentifiers()) { TargetTable targetTable = @@ -172,11 +172,11 @@ public class RunCatalogSync { .formatName(targetCatalogTableIdentifier.getTableFormat()) .build(); targetTables.add(targetTable); - if (!targetCatalogs.containsKey(targetTable.getId())) { - targetCatalogs.put(targetTable.getId(), new ArrayList<>()); + if (!targetCatalogs.containsKey(targetTable)) { + targetCatalogs.put(targetTable, new ArrayList<>()); } targetCatalogs - .get(targetTable.getId()) + .get(targetTable) .add( TargetCatalogConfig.builder() .catalogTableIdentifier( @@ -248,45 +248,86 @@ public class RunCatalogSync { @Data public static class DatasetConfig { + /** + * Configuration of the source catalog from which XTable will read. It must contain all the + * necessary connection and access details for describing and listing tables + */ private Catalog sourceCatalog; + /** + * Defines configuration one or more target catalogs, to which XTable will write or update + * tables. Unlike the source, these catalogs must be writable + */ private List<Catalog> targetCatalogs; + /** A list of datasets that specify how a source table maps to one or more target tables. */ private List<Dataset> datasets; + /** Configuration for catalog. */ @Data public static class Catalog { + /** A unique name for the catalog. */ private String catalogName; + /** + * The type of the source catalog. This might be a specific type understood by XTable, such as + * Hive, Glue etc. + */ private String catalogType; + /** + * (Optional) A fully qualified class name that implements the interfaces for + * CatalogSyncClient, it can be used if the implementation for catalogType doesn't exist in + * XTable. This is an optional field. + */ private String catalogImpl; + /** + * A collection of configs used to configure access or connection properties for the catalog. + */ private Map<String, String> catalogProperties; } @Data - public static class StorageIdentifier { - String tableFormat; - String tableBasePath; - String tableDataPath; - String tableName; - String partitionSpec; - String namespace; + public static class Dataset { + /** Identifies the source table in sourceCatalog. */ + private SourceTableIdentifier sourceCatalogTableIdentifier; + /** A list of one or more targets that this source table should be written to. */ + private List<TargetTableIdentifier> targetCatalogTableIdentifiers; } @Data public static class SourceTableIdentifier { + /** Specifies the table identifier in the source catalog. */ CatalogTableIdentifier catalogTableIdentifier; + /** + * (Optional) Provides direct storage details such as a table’s base path (like an S3 + * location) and the partition specification. This allows reading from a source even if it is + * not strictly registered in a catalog, as long as the format and location are known + */ StorageIdentifier storageIdentifier; } @Data public static class TargetTableIdentifier { + /** name of the target catalog where the table will be created or updated */ String catalogName; + /** + * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how the data will be + * stored at the target. + */ String tableFormat; + /** Specifies the table identifier in the target catalog. */ CatalogTableIdentifier catalogTableIdentifier; } + /** + * Configuration in storage for table. This is an optional field in {@link + * SourceTableIdentifier}. + */ @Data - public static class Dataset { - private SourceTableIdentifier sourceCatalogTableIdentifier; - private List<TargetTableIdentifier> targetCatalogTableIdentifiers; + public static class StorageIdentifier { + String tableFormat; + String tableBasePath; + String tableDataPath; + String tableName; + String partitionSpec; + String namespace; } } } diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java index 7b77214f..243261b5 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java @@ -22,16 +22,8 @@ import static org.junit.jupiter.api.Assertions.*; import lombok.SneakyThrows; -import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; -import org.apache.xtable.conversion.ExternalCatalogConfig; -import org.apache.xtable.conversion.SourceTable; -import org.apache.xtable.model.InternalTable; -import org.apache.xtable.model.catalog.CatalogTableIdentifier; -import org.apache.xtable.spi.extractor.CatalogConversionSource; -import org.apache.xtable.spi.sync.CatalogSyncClient; - class TestRunCatalogSync { @SneakyThrows @@ -43,57 +35,4 @@ class TestRunCatalogSync { // Ensure yaml gets parsed and no op-sync implemented in TestCatalogImpl is called. assertDoesNotThrow(() -> RunCatalogSync.main(args)); } - - public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient { - - public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} - - @Override - public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { - return SourceTable.builder() - .name("source_table_name") - .basePath("file://base_path/v1/") - .formatName("ICEBERG") - .build(); - } - - @Override - public String getCatalogName() { - return null; - } - - @Override - public String getStorageDescriptorLocation(Object o) { - return null; - } - - @Override - public boolean hasDatabase(String databaseName) { - return false; - } - - @Override - public void createDatabase(String databaseName) {} - - @Override - public Object getTable(CatalogTableIdentifier tableIdentifier) { - return null; - } - - @Override - public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void refreshTable( - InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void close() throws Exception {} - } } diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml b/xtable-utilities/src/test/resources/catalogConfig.yaml index 6a4df0f0..f3f1757c 100644 --- a/xtable-utilities/src/test/resources/catalogConfig.yaml +++ b/xtable-utilities/src/test/resources/catalogConfig.yaml @@ -16,26 +16,26 @@ # sourceCatalog: catalogName: "source-1" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key01: "value1" key02: "value2" key03: "value3" targetCatalogs: - catalogName: "target-1" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key11: "value1" key12: "value2" key13: "value3" - catalogName: "target-2" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key21: "value1" key22: "value2" key23: "value3" - catalogName: "target-3" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key31: "value1" key32: "value2"
