This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 9fa812e3 [590] Add RunCatalogSync utility for synchronizing tables
across catalogs
9fa812e3 is described below
commit 9fa812e32df3bba25cfa27398c10a7c9eb24e2e2
Author: Vinish Reddy <[email protected]>
AuthorDate: Mon Jan 27 11:15:03 2025 -0800
[590] Add RunCatalogSync utility for synchronizing tables across catalogs
---
.../apache/xtable/conversion/ConversionConfig.java | 12 +-
.../xtable/conversion/ExternalCatalogConfig.java | 65 ++++
.../xtable/conversion/TargetCatalogConfig.java | 21 +-
.../apache/xtable/model/storage/CatalogType.java | 24 +-
.../spi/extractor/CatalogConversionSource.java | 3 +
.../apache/xtable/spi/sync/CatalogSyncClient.java | 3 +
xtable-core/pom.xml | 20 ++
.../xtable/catalog/CatalogConversionFactory.java | 96 ++++++
.../xtable/conversion/ConversionController.java | 212 +++++++++---
.../ConversionUtils.java} | 27 +-
.../xtable/iceberg/IcebergCatalogConfig.java | 8 +-
.../catalog/TestCatalogConversionFactory.java | 108 ++++++
.../conversion/TestConversionController.java | 174 +++++++++-
.../org/apache/xtable/testutil/ITTestUtils.java | 108 ++++++
...he.xtable.spi.extractor.CatalogConversionSource | 18 +
.../org.apache.xtable.spi.sync.CatalogSyncClient | 18 +
xtable-utilities/pom.xml | 23 ++
.../apache/xtable/utilities/RunCatalogSync.java | 365 +++++++++++++++++++++
.../java/org/apache/xtable/utilities/RunSync.java | 4 +-
.../apache/xtable/utilities/ITRunCatalogSync.java | 142 ++++++++
.../xtable/utilities/TestRunCatalogSync.java | 28 +-
.../src/test/resources/catalogConfig.yaml | 71 ++++
22 files changed, 1441 insertions(+), 109 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
index 73e2628d..63e9d673 100644
---
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
+++
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
@@ -18,7 +18,9 @@
package org.apache.xtable.conversion;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import lombok.Builder;
import lombok.NonNull;
@@ -29,22 +31,30 @@ import com.google.common.base.Preconditions;
import org.apache.xtable.model.sync.SyncMode;
@Value
+@Builder
public class ConversionConfig {
// The source of the sync
@NonNull SourceTable sourceTable;
// One or more targets to sync the table metadata to
List<TargetTable> targetTables;
+ // Each target table can be synced to multiple target catalogs, this is map
from
+ // targetTable to target catalogs.
+ Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs;
// The mode, incremental or snapshot
SyncMode syncMode;
@Builder
ConversionConfig(
- @NonNull SourceTable sourceTable, List<TargetTable> targetTables,
SyncMode syncMode) {
+ @NonNull SourceTable sourceTable,
+ List<TargetTable> targetTables,
+ Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs,
+ SyncMode syncMode) {
this.sourceTable = sourceTable;
this.targetTables = targetTables;
Preconditions.checkArgument(
targetTables != null && !targetTables.isEmpty(),
"Please provide at-least one format to sync");
+ this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() :
targetCatalogs;
this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
}
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
new file mode 100644
index 00000000..b525d831
--- /dev/null
+++
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.conversion;
+
+import java.util.Collections;
+import java.util.Map;
+
+import lombok.Builder;
+import lombok.NonNull;
+import lombok.Value;
+
+/**
+ * Defines the configuration for an external catalog, user needs to populate
at-least one of {@link
+ * ExternalCatalogConfig#catalogType} or {@link
ExternalCatalogConfig#catalogSyncClientImpl}
+ */
+@Value
+@Builder
+public class ExternalCatalogConfig {
+ /**
+ * A user-defined unique identifier for the catalog, allows user to sync
table to multiple
+ * catalogs of the same name/type eg: HMS catalog with url1, HMS catalog
with url2
+ */
+ @NonNull String catalogId;
+
+ /**
+ * The type of the catalog. If the catalogType implementation exists in
XTable, the implementation
+ * class will be inferred.
+ */
+ String catalogType;
+
+ /**
+ * (Optional) A fully qualified class name that implements the interface for
{@link
+ * org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the
implementation for
+ * catalogType doesn't exist in XTable.
+ */
+ String catalogSyncClientImpl;
+
+ /**
+ * (Optional) A fully qualified class name that implements the interface for
{@link
+ * org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used
if the implementation
+ * for catalogType doesn't exist in XTable.
+ */
+ String catalogConversionSourceImpl;
+
+ /**
+ * The properties for this catalog, used for providing any custom behaviour
during catalog sync
+ */
+ @NonNull @Builder.Default Map<String, String> catalogProperties =
Collections.emptyMap();
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
similarity index 64%
copy from
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to
xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
index d5d7a3c5..d6687523 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
@@ -16,21 +16,24 @@
* limitations under the License.
*/
-package org.apache.xtable.iceberg;
-
-import java.util.Collections;
-import java.util.Map;
+package org.apache.xtable.conversion;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
-import org.apache.xtable.conversion.CatalogConfig;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+/**
+ * TargetCatalogConfig contains the parameters that are required when syncing
{@link TargetTable} to
+ * a catalog.
+ */
@Value
@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
- @NonNull String catalogImpl;
- @NonNull String catalogName;
- @NonNull @Builder.Default Map<String, String> catalogOptions =
Collections.emptyMap();
+public class TargetCatalogConfig {
+ /** The tableIdentifier that will be used when syncing {@link TargetTable}
to the catalog. */
+ @NonNull CatalogTableIdentifier catalogTableIdentifier;
+
+ /** Configuration for the catalog. */
+ @NonNull ExternalCatalogConfig catalogConfig;
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
similarity index 65%
copy from
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to
xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
index d5d7a3c5..e2b028dc 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
@@ -16,21 +16,13 @@
* limitations under the License.
*/
-package org.apache.xtable.iceberg;
+package org.apache.xtable.model.storage;
-import java.util.Collections;
-import java.util.Map;
-
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
-
-import org.apache.xtable.conversion.CatalogConfig;
-
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
- @NonNull String catalogImpl;
- @NonNull String catalogName;
- @NonNull @Builder.Default Map<String, String> catalogOptions =
Collections.emptyMap();
+/**
+ * Default constants for supported catalog types.
+ *
+ * @since 0.1
+ */
+public class CatalogType {
+ public static final String STORAGE = "STORAGE";
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
index 616f3d45..1525e6fa 100644
---
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
+++
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
@@ -29,4 +29,7 @@ import org.apache.xtable.model.catalog.CatalogTableIdentifier;
public interface CatalogConversionSource {
/** Returns the source table object present in the catalog. */
SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier);
+
+ /** Returns the {@link org.apache.xtable.model.storage.CatalogType} for the
catalog conversion */
+ String getCatalogType();
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
index 62de9379..cc322854 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
@@ -34,6 +34,9 @@ public interface CatalogSyncClient<TABLE> extends
AutoCloseable {
*/
String getCatalogId();
+ /** Returns the {@link org.apache.xtable.model.storage.CatalogType} the
client syncs to */
+ String getCatalogType();
+
/** Returns the storage location of the table synced to the catalog. */
String getStorageLocation(TABLE table);
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index f277495e..80de2299 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -174,4 +174,24 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
new file mode 100644
index 00000000..add95c21
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.catalog;
+
+import java.util.ServiceLoader;
+import java.util.function.Function;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.reflection.ReflectionUtils;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogConversionFactory {
+ private static final CatalogConversionFactory INSTANCE = new
CatalogConversionFactory();
+
+ public static CatalogConversionFactory getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Returns an implementation class for {@link CatalogConversionSource}
that's used for converting
+ * table definitions in the catalog to {@link
org.apache.xtable.conversion.SourceTable} object.
+ *
+ * @param sourceCatalogConfig configuration for the source catalog
+ * @param configuration hadoop configuration
+ */
+ public static CatalogConversionSource createCatalogConversionSource(
+ ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {
+ if (!StringUtils.isEmpty(sourceCatalogConfig.getCatalogType())) {
+ return findInstance(
+ CatalogConversionSource.class,
+ sourceCatalogConfig.getCatalogType(),
+ CatalogConversionSource::getCatalogType);
+ }
+ return ReflectionUtils.createInstanceOfClass(
+ sourceCatalogConfig.getCatalogConversionSourceImpl(),
sourceCatalogConfig, configuration);
+ }
+
+ /**
+ * Returns an implementation class for {@link CatalogSyncClient} that's used
for syncing {@link
+ * org.apache.xtable.conversion.TargetTable} to a catalog.
+ *
+ * @param targetCatalogConfig configuration for the target catalog
+ * @param configuration hadoop configuration
+ */
+ public <TABLE> CatalogSyncClient<TABLE> createCatalogSyncClient(
+ ExternalCatalogConfig targetCatalogConfig, String tableFormat,
Configuration configuration) {
+ if (!StringUtils.isEmpty(targetCatalogConfig.getCatalogType())) {
+ return findInstance(
+ CatalogSyncClient.class,
+ targetCatalogConfig.getCatalogType(),
+ CatalogSyncClient::getCatalogType);
+ }
+ return ReflectionUtils.createInstanceOfClass(
+ targetCatalogConfig.getCatalogSyncClientImpl(),
+ targetCatalogConfig,
+ tableFormat,
+ configuration);
+ }
+
+ private static <T> T findInstance(
+ Class<T> serviceClass, String catalogType, Function<T, String>
catalogTypeExtractor) {
+ ServiceLoader<T> loader = ServiceLoader.load(serviceClass);
+ for (T instance : loader) {
+ String instanceCatalogType = catalogTypeExtractor.apply(instance);
+ if (catalogType.equals(instanceCatalogType)) {
+ return instance;
+ }
+ }
+ throw new NotSupportedException("catalogType is not yet supported: " +
catalogType);
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index 222652a6..1db145ee 100644
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -18,6 +18,8 @@
package org.apache.xtable.conversion;
+import static
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
@@ -37,15 +39,19 @@ import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.conf.Configuration;
+import org.apache.xtable.catalog.CatalogConversionFactory;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.spi.extractor.ConversionSource;
import org.apache.xtable.spi.extractor.ExtractFromSource;
+import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
import org.apache.xtable.spi.sync.ConversionTarget;
import org.apache.xtable.spi.sync.TableFormatSync;
@@ -64,10 +70,17 @@ import org.apache.xtable.spi.sync.TableFormatSync;
public class ConversionController {
private final Configuration conf;
private final ConversionTargetFactory conversionTargetFactory;
+ private final CatalogConversionFactory catalogConversionFactory;
private final TableFormatSync tableFormatSync;
+ private final CatalogSync catalogSync;
public ConversionController(Configuration conf) {
- this(conf, ConversionTargetFactory.getInstance(),
TableFormatSync.getInstance());
+ this(
+ conf,
+ ConversionTargetFactory.getInstance(),
+ CatalogConversionFactory.getInstance(),
+ TableFormatSync.getInstance(),
+ CatalogSync.getInstance());
}
/**
@@ -89,57 +102,146 @@ public class ConversionController {
try (ConversionSource<COMMIT> conversionSource =
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
ExtractFromSource<COMMIT> source =
ExtractFromSource.of(conversionSource);
+ return syncTableFormats(config, source, config.getSyncMode());
+ } catch (IOException ioException) {
+ throw new ReadException("Failed to close source converter", ioException);
+ }
+ }
- Map<String, ConversionTarget> conversionTargetByFormat =
- config.getTargetTables().stream()
- .collect(
- Collectors.toMap(
- TargetTable::getFormatName,
- targetTable ->
conversionTargetFactory.createForFormat(targetTable, conf)));
- // State for each TableFormat
- Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
- conversionTargetByFormat.entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey, entry ->
entry.getValue().getTableMetadata()));
- Map<String, ConversionTarget> formatsToSyncIncrementally =
- getFormatsToSyncIncrementally(
- config,
- conversionTargetByFormat,
- lastSyncMetadataByFormat,
- source.getConversionSource());
- Map<String, ConversionTarget> formatsToSyncBySnapshot =
- conversionTargetByFormat.entrySet().stream()
- .filter(entry ->
!formatsToSyncIncrementally.containsKey(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- SyncResultForTableFormats syncResultForSnapshotSync =
- formatsToSyncBySnapshot.isEmpty()
- ? SyncResultForTableFormats.builder().build()
- : syncSnapshot(formatsToSyncBySnapshot, source);
- SyncResultForTableFormats syncResultForIncrementalSync =
- formatsToSyncIncrementally.isEmpty()
- ? SyncResultForTableFormats.builder().build()
- : syncIncrementalChanges(
- formatsToSyncIncrementally, lastSyncMetadataByFormat,
source);
- Map<String, SyncResult> syncResultsMerged =
- new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
- syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
- String successfulSyncs =
- getFormatsWithStatusCode(syncResultsMerged,
SyncResult.SyncStatusCode.SUCCESS);
- if (!successfulSyncs.isEmpty()) {
- log.info("Sync is successful for the following formats {}",
successfulSyncs);
- }
- String failedSyncs =
- getFormatsWithStatusCode(syncResultsMerged,
SyncResult.SyncStatusCode.ERROR);
- if (!failedSyncs.isEmpty()) {
- log.error("Sync failed for the following formats {}", failedSyncs);
+ /**
+ * Synchronizes the source table in conversion config to multiple target
catalogs. If the
+ * configuration for the target table uses a different table format,
synchronizes the table format
+ * first before syncing it to target catalog
+ *
+ * @param config A per table level config containing source table, target
tables, target catalogs
+ * and syncMode.
+ * @param conversionSourceProvider A provider for the {@link
ConversionSource} instance for each
+ * tableFormat, {@link ConversionSourceProvider#init(Configuration)}
must be called before
+ * calling this method.
+ * @return Returns a map containing the table format, and it's sync result.
Run sync for a table *
+ * with the provided per table level configuration.
+ */
+ public Map<String, SyncResult> syncTableAcrossCatalogs(
+ ConversionConfig config, Map<String, ConversionSourceProvider>
conversionSourceProvider) {
+ if (config.getTargetTables() == null ||
config.getTargetTables().isEmpty()) {
+ throw new IllegalArgumentException("Please provide at-least one format
to sync");
+ }
+ try (ConversionSource conversionSource =
+ conversionSourceProvider
+ .get(config.getSourceTable().getFormatName())
+ .getConversionSourceInstance(config.getSourceTable())) {
+ ExtractFromSource source = ExtractFromSource.of(conversionSource);
+ Map<String, SyncResult> tableFormatSyncResults =
+ syncTableFormats(config, source, config.getSyncMode());
+ Map<String, SyncResult> catalogSyncResults = new HashMap<>();
+ for (TargetTable targetTable : config.getTargetTables()) {
+ Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
+ config.getTargetCatalogs().get(targetTable).stream()
+ .collect(
+ Collectors.toMap(
+ TargetCatalogConfig::getCatalogTableIdentifier,
+ targetCatalog ->
+ catalogConversionFactory.createCatalogSyncClient(
+ targetCatalog.getCatalogConfig(),
+ targetTable.getFormatName(),
+ conf)));
+ catalogSyncResults.put(
+ targetTable.getFormatName(),
+ syncCatalogsForTargetTable(
+ targetTable,
+ catalogSyncClients,
+ conversionSourceProvider.get(targetTable.getFormatName())));
}
- return syncResultsMerged;
+ mergeSyncResults(tableFormatSyncResults, catalogSyncResults);
+ return tableFormatSyncResults;
} catch (IOException ioException) {
throw new ReadException("Failed to close source converter", ioException);
}
}
+ /**
+ * Synchronizes the given source table format metadata in ConversionConfig
to multiple target
+ * formats.
+ *
+ * @param config A per table level config containing tableBasePath,
partitionFieldSpecConfig,
+ * targetTableFormats and syncMode.
+ * @param source An extractor class for {@link ConversionSource} and allows
fetching current
+ * snapshot or incremental table changes.
+ * @param syncMode sync mode is either FULL or INCREMENTAL.
+ * @return Returns a map containing the table format, and it's sync result.
+ */
+ private <COMMIT> Map<String, SyncResult> syncTableFormats(
+ ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode
syncMode) {
+ Map<String, ConversionTarget> conversionTargetByFormat =
+ config.getTargetTables().stream()
+ .filter(
+ targetTable ->
+
!targetTable.getFormatName().equals(config.getSourceTable().getFormatName()))
+ .collect(
+ Collectors.toMap(
+ TargetTable::getFormatName,
+ targetTable ->
conversionTargetFactory.createForFormat(targetTable, conf)));
+
+ Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
+ conversionTargetByFormat.entrySet().stream()
+ .collect(
+ Collectors.toMap(Map.Entry::getKey, entry ->
entry.getValue().getTableMetadata()));
+ Map<String, ConversionTarget> formatsToSyncIncrementally =
+ getFormatsToSyncIncrementally(
+ syncMode,
+ conversionTargetByFormat,
+ lastSyncMetadataByFormat,
+ source.getConversionSource());
+ Map<String, ConversionTarget> formatsToSyncBySnapshot =
+ conversionTargetByFormat.entrySet().stream()
+ .filter(entry ->
!formatsToSyncIncrementally.containsKey(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ SyncResultForTableFormats syncResultForSnapshotSync =
+ formatsToSyncBySnapshot.isEmpty()
+ ? SyncResultForTableFormats.builder().build()
+ : syncSnapshot(formatsToSyncBySnapshot, source);
+ SyncResultForTableFormats syncResultForIncrementalSync =
+ formatsToSyncIncrementally.isEmpty()
+ ? SyncResultForTableFormats.builder().build()
+ : syncIncrementalChanges(formatsToSyncIncrementally,
lastSyncMetadataByFormat, source);
+ Map<String, SyncResult> syncResultsMerged =
+ new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
+ syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
+ String successfulSyncs =
+ getFormatsWithStatusCode(syncResultsMerged,
SyncResult.SyncStatusCode.SUCCESS);
+ if (!successfulSyncs.isEmpty()) {
+ log.info("Sync is successful for the following formats {}",
successfulSyncs);
+ }
+ String failedSyncs =
+ getFormatsWithStatusCode(syncResultsMerged,
SyncResult.SyncStatusCode.ERROR);
+ if (!failedSyncs.isEmpty()) {
+ log.error("Sync failed for the following formats {}", failedSyncs);
+ }
+ return syncResultsMerged;
+ }
+
+ /**
+ * Synchronizes the target table to multiple target catalogs.
+ *
+ * @param targetTable target table that needs to synced.
+ * @param catalogSyncClients Collection of catalog sync clients along with
their table identifiers
+ * for each target catalog.
+ * @param conversionSourceProvider A provider for the {@link
ConversionSource} instance for the
+ * table format of targetTable.
+ */
+ private SyncResult syncCatalogsForTargetTable(
+ TargetTable targetTable,
+ Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients,
+ ConversionSourceProvider conversionSourceProvider) {
+ return catalogSync.syncTable(
+ catalogSyncClients,
+ // We get the latest state of InternalTable for TargetTable
+ // and then synchronize it to catalogSyncClients.
+ conversionSourceProvider
+ .getConversionSourceInstance(convertToSourceTable(targetTable))
+ .getCurrentTable());
+ }
+
private static String getFormatsWithStatusCode(
Map<String, SyncResult> syncResultsMerged, SyncResult.SyncStatusCode
statusCode) {
return syncResultsMerged.entrySet().stream()
@@ -149,11 +251,11 @@ public class ConversionController {
}
private <COMMIT> Map<String, ConversionTarget> getFormatsToSyncIncrementally(
- ConversionConfig conversionConfig,
+ SyncMode syncMode,
Map<String, ConversionTarget> conversionTargetByFormat,
Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat,
ConversionSource<COMMIT> conversionSource) {
- if (conversionConfig.getSyncMode() == SyncMode.FULL) {
+ if (syncMode == SyncMode.FULL) {
// Full sync requested by config, hence no incremental sync.
return Collections.emptyMap();
}
@@ -268,6 +370,22 @@ public class ConversionController {
.build();
}
+ private void mergeSyncResults(
+ Map<String, SyncResult> syncResultsMerged, Map<String, SyncResult>
catalogSyncResults) {
+ catalogSyncResults.forEach(
+ (tableFormat, catalogSyncResult) -> {
+ syncResultsMerged.computeIfPresent(
+ tableFormat,
+ (k, syncResult) ->
+ syncResult.toBuilder()
+ .syncDuration(
+
syncResult.getSyncDuration().plus(catalogSyncResult.getSyncDuration()))
+
.catalogSyncStatusList(catalogSyncResult.getCatalogSyncStatusList())
+ .build());
+ syncResultsMerged.computeIfAbsent(tableFormat, k ->
catalogSyncResult);
+ });
+ }
+
@Value
@Builder
private static class SyncResultForTableFormats {
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
similarity index 65%
copy from
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
index d5d7a3c5..f21be670 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
@@ -16,21 +16,18 @@
* limitations under the License.
*/
-package org.apache.xtable.iceberg;
+package org.apache.xtable.conversion;
-import java.util.Collections;
-import java.util.Map;
+public class ConversionUtils {
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
-
-import org.apache.xtable.conversion.CatalogConfig;
-
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
- @NonNull String catalogImpl;
- @NonNull String catalogName;
- @NonNull @Builder.Default Map<String, String> catalogOptions =
Collections.emptyMap();
+ public static SourceTable convertToSourceTable(TargetTable table) {
+ return new SourceTable(
+ table.getName(),
+ table.getFormatName(),
+ table.getBasePath(),
+ table.getBasePath(),
+ table.getNamespace(),
+ table.getCatalogConfig(),
+ table.getAdditionalProperties());
+ }
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
index d5d7a3c5..e0ec7762 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
@@ -27,10 +27,16 @@ import lombok.Value;
import org.apache.xtable.conversion.CatalogConfig;
+/**
+ * Iceberg requires a catalog to perform any operation, if no catalog is
provided the default
+ * catalog (HadoopCatalog or storage based catalog) is used. For syncing
iceberg to multiple
+ * catalogs, you can use {@link
org.apache.xtable.conversion.ExternalCatalogConfig} instead which
+ * allows syncing the latest version of iceberg metadata to multiple catalogs.
+ */
@Value
@Builder
public class IcebergCatalogConfig implements CatalogConfig {
- @NonNull String catalogImpl;
@NonNull String catalogName;
+ @NonNull String catalogImpl;
@NonNull @Builder.Default Map<String, String> catalogOptions =
Collections.emptyMap();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
new file mode 100644
index 00000000..1d05666b
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.TargetCatalogConfig;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+import org.apache.xtable.testutil.ITTestUtils;
+import org.apache.xtable.testutil.ITTestUtils.TestCatalogConversionSourceImpl;
+import org.apache.xtable.testutil.ITTestUtils.TestCatalogSyncImpl;
+
+class TestCatalogConversionFactory {
+
+ @Test
+ void createCatalogConversionSource() {
+ ExternalCatalogConfig sourceCatalog =
+ ExternalCatalogConfig.builder()
+ .catalogId("catalogId")
+
.catalogConversionSourceImpl(TestCatalogConversionSourceImpl.class.getName())
+ .catalogProperties(Collections.emptyMap())
+ .build();
+ CatalogConversionSource catalogConversionSource =
+ CatalogConversionFactory.createCatalogConversionSource(sourceCatalog,
new Configuration());
+ assertEquals(
+ catalogConversionSource.getClass().getName(),
+ TestCatalogConversionSourceImpl.class.getName());
+ }
+
+ @Test
+ void createCatalogConversionSourceForCatalogType() {
+ ExternalCatalogConfig sourceCatalog =
+ ExternalCatalogConfig.builder()
+ .catalogId("catalogId")
+ .catalogType(ITTestUtils.TEST_CATALOG_TYPE)
+ .catalogProperties(Collections.emptyMap())
+ .build();
+ CatalogConversionSource catalogConversionSource =
+ CatalogConversionFactory.createCatalogConversionSource(sourceCatalog,
new Configuration());
+ assertEquals(
+ catalogConversionSource.getClass().getName(),
+ TestCatalogConversionSourceImpl.class.getName());
+ }
+
+ @Test
+ void createCatalogSyncClient() {
+ TargetCatalogConfig targetCatalogConfig =
+ TargetCatalogConfig.builder()
+ .catalogConfig(
+ ExternalCatalogConfig.builder()
+ .catalogId("catalogId")
+ .catalogSyncClientImpl(TestCatalogSyncImpl.class.getName())
+ .catalogProperties(Collections.emptyMap())
+ .build())
+ .catalogTableIdentifier(
+ new ThreePartHierarchicalTableIdentifier("target-database",
"target-tableName"))
+ .build();
+ CatalogSyncClient catalogSyncClient =
+ CatalogConversionFactory.getInstance()
+ .createCatalogSyncClient(
+ targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new
Configuration());
+ assertEquals(catalogSyncClient.getClass().getName(),
TestCatalogSyncImpl.class.getName());
+ }
+
+ @Test
+ void createCatalogSyncClientForCatalogType() {
+ TargetCatalogConfig targetCatalogConfig =
+ TargetCatalogConfig.builder()
+ .catalogConfig(
+ ExternalCatalogConfig.builder()
+ .catalogId("catalogId")
+ .catalogType(ITTestUtils.TEST_CATALOG_TYPE)
+ .catalogProperties(Collections.emptyMap())
+ .build())
+ .catalogTableIdentifier(
+ new ThreePartHierarchicalTableIdentifier("target-database",
"target-tableName"))
+ .build();
+ CatalogSyncClient catalogSyncClient =
+ CatalogConversionFactory.getInstance()
+ .createCatalogSyncClient(
+ targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new
Configuration());
+ assertEquals(catalogSyncClient.getClass().getName(),
TestCatalogSyncImpl.class.getName());
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index caba8046..0f34103e 100644
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -18,8 +18,12 @@
package org.apache.xtable.conversion;
+import static
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
+import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -36,6 +40,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -43,17 +48,23 @@ import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatcher;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.xtable.catalog.CatalogConversionFactory;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.spi.extractor.ConversionSource;
+import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
import org.apache.xtable.spi.sync.ConversionTarget;
import org.apache.xtable.spi.sync.TableFormatSync;
@@ -62,12 +73,22 @@ public class TestConversionController {
private final Configuration mockConf = mock(Configuration.class);
private final ConversionSourceProvider<Instant> mockConversionSourceProvider
=
mock(ConversionSourceProvider.class);
+ private final ConversionSourceProvider<Instant>
mockConversionSourceProvider2 =
+ mock(ConversionSourceProvider.class);
+ private final ConversionSourceProvider<Instant>
mockConversionSourceProvider3 =
+ mock(ConversionSourceProvider.class);
+
private final ConversionSource<Instant> mockConversionSource =
mock(ConversionSource.class);
private final ConversionTargetFactory mockConversionTargetFactory =
mock(ConversionTargetFactory.class);
+ private final CatalogConversionFactory mockCatalogConversionFactory =
+ mock(CatalogConversionFactory.class);
private final TableFormatSync tableFormatSync = mock(TableFormatSync.class);
+ private final CatalogSync catalogSync = mock(CatalogSync.class);
private final ConversionTarget mockConversionTarget1 =
mock(ConversionTarget.class);
private final ConversionTarget mockConversionTarget2 =
mock(ConversionTarget.class);
+ private final CatalogSyncClient mockCatalogSyncClient1 =
mock(CatalogSyncClient.class);
+ private final CatalogSyncClient mockCatalogSyncClient2 =
mock(CatalogSyncClient.class);
@Test
void testAllSnapshotSyncAsPerConfig() {
@@ -96,7 +117,12 @@ public class TestConversionController {
eq(internalSnapshot)))
.thenReturn(perTableResults);
ConversionController conversionController =
- new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
+ new ConversionController(
+ mockConf,
+ mockConversionTargetFactory,
+ mockCatalogConversionFactory,
+ tableFormatSync,
+ catalogSync);
Map<String, SyncResult> result =
conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(perTableResults, result);
@@ -182,7 +208,12 @@ public class TestConversionController {
expectedSyncResult.put(TableFormat.ICEBERG,
getLastSyncResult(icebergSyncResults));
expectedSyncResult.put(TableFormat.DELTA,
getLastSyncResult(deltaSyncResults));
ConversionController conversionController =
- new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
+ new ConversionController(
+ mockConf,
+ mockConversionTargetFactory,
+ mockCatalogConversionFactory,
+ tableFormatSync,
+ catalogSync);
Map<String, SyncResult> result =
conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(expectedSyncResult, result);
@@ -226,7 +257,12 @@ public class TestConversionController {
eq(internalSnapshot)))
.thenReturn(syncResults);
ConversionController conversionController =
- new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
+ new ConversionController(
+ mockConf,
+ mockConversionTargetFactory,
+ mockCatalogConversionFactory,
+ tableFormatSync,
+ catalogSync);
Map<String, SyncResult> result =
conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(syncResults, result);
@@ -310,7 +346,12 @@ public class TestConversionController {
expectedSyncResult.put(TableFormat.ICEBERG, syncResult);
expectedSyncResult.put(TableFormat.DELTA,
getLastSyncResult(deltaSyncResults));
ConversionController conversionController =
- new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
+ new ConversionController(
+ mockConf,
+ mockConversionTargetFactory,
+ mockCatalogConversionFactory,
+ tableFormatSync,
+ catalogSync);
Map<String, SyncResult> result =
conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(expectedSyncResult, result);
@@ -368,16 +409,104 @@ public class TestConversionController {
// Iceberg and Delta have no commits to sync
Map<String, SyncResult> expectedSyncResult = Collections.emptyMap();
ConversionController conversionController =
- new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
+ new ConversionController(
+ mockConf,
+ mockConversionTargetFactory,
+ mockCatalogConversionFactory,
+ tableFormatSync,
+ catalogSync);
Map<String, SyncResult> result =
conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(expectedSyncResult, result);
}
+ @Test
+ void testNoTableFormatConversionWithMultipleCatalogSync() {
+ SyncMode syncMode = SyncMode.INCREMENTAL;
+ List<TargetCatalogConfig> targetCatalogs =
+ Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2"));
+ InternalTable internalTable = getInternalTable();
+ InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+ // Conversion source and target mocks.
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode,
targetCatalogs);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ conversionConfig.getSourceTable()))
+ .thenReturn(mockConversionSource);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ convertToSourceTable(conversionConfig.getTargetTables().get(0))))
+ .thenReturn(mockConversionSource);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ convertToSourceTable(conversionConfig.getTargetTables().get(1))))
+ .thenReturn(mockConversionSource);
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(0), mockConf))
+ .thenReturn(mockConversionTarget1);
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(1), mockConf))
+ .thenReturn(mockConversionTarget2);
+
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
+
when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable());
+ // Mocks for tableFormatSync.
+ Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
+ Instant syncStartTime = Instant.now();
+ SyncResult syncResult =
+ buildSyncResult(syncMode, instantBeforeHour, syncStartTime,
Duration.ofSeconds(1));
+ Map<String, SyncResult> tableFormatSyncResults =
+ buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult);
+ when(tableFormatSync.syncSnapshot(
+ argThat(containsAll(Arrays.asList(mockConversionTarget1,
mockConversionTarget2))),
+ eq(internalSnapshot)))
+ .thenReturn(tableFormatSyncResults);
+ // Mocks for catalogSync.
+ when(mockCatalogConversionFactory.createCatalogSyncClient(
+ eq(targetCatalogs.get(0).getCatalogConfig()), any(), eq(mockConf)))
+ .thenReturn(mockCatalogSyncClient1);
+ when(mockCatalogConversionFactory.createCatalogSyncClient(
+ eq(targetCatalogs.get(1).getCatalogConfig()), any(), eq(mockConf)))
+ .thenReturn(mockCatalogSyncClient2);
+ when(catalogSync.syncTable(
+ eq(
+ ImmutableMap.of(
+ targetCatalogs.get(0).getCatalogTableIdentifier(),
mockCatalogSyncClient1,
+ targetCatalogs.get(1).getCatalogTableIdentifier(),
mockCatalogSyncClient2)),
+ any()))
+ .thenReturn(
+ buildSyncResult(syncMode, syncStartTime, instantBeforeHour,
Duration.ofSeconds(3)));
+ ConversionController conversionController =
+ new ConversionController(
+ mockConf,
+ mockConversionTargetFactory,
+ mockCatalogConversionFactory,
+ tableFormatSync,
+ catalogSync);
+ // Mocks for conversionSourceProviders.
+ Map<String, ConversionSourceProvider> conversionSourceProviders = new
HashMap<>();
+ conversionSourceProviders.put(HUDI, mockConversionSourceProvider);
+ conversionSourceProviders.put(ICEBERG, mockConversionSourceProvider);
+ conversionSourceProviders.put(DELTA, mockConversionSourceProvider);
+ // Assert results.
+ Map<String, SyncResult> mergedSyncResults =
+ buildPerTableResult(
+ Arrays.asList(ICEBERG, DELTA),
+
syncResult.toBuilder().syncDuration(Duration.ofSeconds(4)).build());
+ Map<String, SyncResult> result =
+ conversionController.syncTableAcrossCatalogs(conversionConfig,
conversionSourceProviders);
+ assertEquals(mergedSyncResults, result);
+ }
+
private SyncResult getLastSyncResult(List<SyncResult> syncResults) {
return syncResults.get(syncResults.size() - 1);
}
+ private Map<String, SyncResult> buildPerTableResult(
+ List<String> tableFormats, SyncResult syncResult) {
+ Map<String, SyncResult> perTableResults = new HashMap<>();
+ tableFormats.forEach(tableFormat -> perTableResults.put(tableFormat,
syncResult));
+ return perTableResults;
+ }
+
private List<SyncResult> buildSyncResults(List<Instant> instantList) {
return instantList.stream()
.map(instant -> buildSyncResult(SyncMode.INCREMENTAL, instant))
@@ -396,6 +525,17 @@ public class TestConversionController {
.build();
}
+ private SyncResult buildSyncResult(
+ SyncMode syncMode, Instant syncStartTime, Instant lastSyncedInstant,
Duration duration) {
+ return SyncResult.builder()
+ .mode(syncMode)
+ .lastInstantSynced(lastSyncedInstant)
+ .syncStartTime(syncStartTime)
+ .syncDuration(duration)
+ .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
+ .build();
+ }
+
private InternalSnapshot buildSnapshot(InternalTable internalTable, String
version) {
return
InternalSnapshot.builder().table(internalTable).version(version).build();
}
@@ -413,6 +553,13 @@ public class TestConversionController {
}
private ConversionConfig getTableSyncConfig(List<String> targetTableFormats,
SyncMode syncMode) {
+ return getTableSyncConfig(targetTableFormats, syncMode,
Collections.emptyList());
+ }
+
+ private ConversionConfig getTableSyncConfig(
+ List<String> targetTableFormats,
+ SyncMode syncMode,
+ List<TargetCatalogConfig> targetCatalogs) {
SourceTable sourceTable =
SourceTable.builder()
.name("tablename")
@@ -434,10 +581,27 @@ public class TestConversionController {
return ConversionConfig.builder()
.sourceTable(sourceTable)
.targetTables(targetTables)
+ .targetCatalogs(
+ targetTables.stream()
+ .collect(Collectors.toMap(Function.identity(), k ->
targetCatalogs)))
.syncMode(syncMode)
.build();
}
+ private TargetCatalogConfig getTargetCatalog(String suffix) {
+ return TargetCatalogConfig.builder()
+ .catalogConfig(
+ ExternalCatalogConfig.builder()
+ .catalogId("catalogId-" + suffix)
+ .catalogSyncClientImpl("catalogImpl-" + suffix)
+ .catalogProperties(Collections.emptyMap())
+ .build())
+ .catalogTableIdentifier(
+ new ThreePartHierarchicalTableIdentifier(
+ "target-database" + suffix, "target-tableName" + suffix))
+ .build();
+ }
+
private static <T> ArgumentMatcher<Collection<T>> containsAll(Collection<T>
expected) {
return actual -> actual.size() == expected.size() &&
actual.containsAll(expected);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
index 281e61fe..ce374f39 100644
--- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
+++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
@@ -18,16 +18,25 @@
package org.apache.xtable.testutil;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
public class ITTestUtils {
+ public static final String TEST_CATALOG_TYPE = "test";
public static void validateTable(
InternalTable internalTable,
@@ -44,4 +53,103 @@ public class ITTestUtils {
Assertions.assertEquals(basePath, internalTable.getBasePath());
Assertions.assertEquals(partitioningFields,
internalTable.getPartitioningFields());
}
+
+ public static class TestCatalogSyncImpl implements CatalogSyncClient {
+ private static final Map<String, Integer> FUNCTION_CALLS = new HashMap<>();
+
+ public TestCatalogSyncImpl(
+ ExternalCatalogConfig catalogConfig, String tableFormat, Configuration
hadoopConf) {}
+
+ public TestCatalogSyncImpl() {}
+
+ @Override
+ public String getCatalogId() {
+ trackFunctionCall();
+ return null;
+ }
+
+ @Override
+ public String getCatalogType() {
+ return TEST_CATALOG_TYPE;
+ }
+
+ @Override
+ public String getStorageLocation(Object o) {
+ trackFunctionCall();
+ return null;
+ }
+
+ @Override
+ public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) {
+ trackFunctionCall();
+ return false;
+ }
+
+ @Override
+ public void createDatabase(CatalogTableIdentifier tableIdentifier) {
+ trackFunctionCall();
+ }
+
+ @Override
+ public Object getTable(CatalogTableIdentifier tableIdentifier) {
+ trackFunctionCall();
+ return null;
+ }
+
+ @Override
+ public void createTable(InternalTable table, CatalogTableIdentifier
tableIdentifier) {
+ trackFunctionCall();
+ }
+
+ @Override
+ public void refreshTable(
+ InternalTable table, Object catalogTable, CatalogTableIdentifier
tableIdentifier) {
+ trackFunctionCall();
+ }
+
+ @Override
+ public void createOrReplaceTable(InternalTable table,
CatalogTableIdentifier tableIdentifier) {
+ trackFunctionCall();
+ }
+
+ @Override
+ public void dropTable(InternalTable table, CatalogTableIdentifier
tableIdentifier) {
+ trackFunctionCall();
+ }
+
+ @Override
+ public void close() throws Exception {
+ trackFunctionCall();
+ }
+
+ private void trackFunctionCall() {
+ String methodName =
Thread.currentThread().getStackTrace()[2].getMethodName();
+ FUNCTION_CALLS.put(methodName, FUNCTION_CALLS.getOrDefault(methodName,
0) + 1);
+ }
+
+ public static Map<String, Integer> getFunctionCalls() {
+ return FUNCTION_CALLS;
+ }
+ }
+
+ public static class TestCatalogConversionSourceImpl implements
CatalogConversionSource {
+ public TestCatalogConversionSourceImpl(
+ ExternalCatalogConfig sourceCatalogConfig, Configuration
configuration) {}
+
+ public TestCatalogConversionSourceImpl() {}
+
+ @Override
+ public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
+ return SourceTable.builder()
+ .name("source_table_name")
+ .basePath("file://base_path/v1/")
+ .formatName("ICEBERG")
+ .build();
+ }
+
+ @Override
+ public String getCatalogType() {
+ return TEST_CATALOG_TYPE;
+ }
+ }
}
diff --git
a/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
new file mode 100644
index 00000000..ceb8a2ff
--- /dev/null
+++
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
@@ -0,0 +1,18 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl
\ No newline at end of file
diff --git
a/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
new file mode 100644
index 00000000..4e571e3e
--- /dev/null
+++
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
@@ -0,0 +1,18 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+org.apache.xtable.testutil.ITTestUtils$TestCatalogSyncImpl
\ No newline at end of file
diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml
index bc91f99e..979cb456 100644
--- a/xtable-utilities/pom.xml
+++ b/xtable-utilities/pom.xml
@@ -35,6 +35,15 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.xtable</groupId>
+ <artifactId>xtable-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!-- command line arg parsing -->
<dependency>
<groupId>commons-cli</groupId>
@@ -125,6 +134,20 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.xtable</groupId>
+ <artifactId>xtable-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+
<artifactId>hudi-spark${spark.version.prefix}-bundle_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
new file mode 100644
index 00000000..60a62cd9
--- /dev/null
+++
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.utilities;
+
+import static org.apache.xtable.utilities.RunSync.getCustomConfigurations;
+import static org.apache.xtable.utilities.RunSync.loadHadoopConf;
+import static
org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+import lombok.extern.jackson.Jacksonized;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import org.apache.xtable.catalog.CatalogConversionFactory;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetCatalogConfig;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiSourceConfig;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.reflection.ReflectionUtils;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier;
+import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TableIdentifier;
+import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier;
+import org.apache.xtable.utilities.RunSync.TableFormatConverters;
+
+/**
+ * Provides standalone process for reading tables from a source catalog and
synchronizing their
+ * state in target tables, supports table format conversion as well if the
target table chooses a
+ * different format from source table.
+ */
+@Log4j2
+public class RunCatalogSync {
+ public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new
YAMLFactory());
+ private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH =
"catalogConfig";
+ private static final String HADOOP_CONFIG_PATH = "hadoopConfig";
+ private static final String CONVERTERS_CONFIG_PATH = "convertersConfig";
+ private static final String HELP_OPTION = "h";
+ private static final Map<String, ConversionSourceProvider>
CONVERSION_SOURCE_PROVIDERS =
+ new HashMap<>();
+
+ private static final Options OPTIONS =
+ new Options()
+ .addRequiredOption(
+ CATALOG_SOURCE_AND_TARGET_CONFIG_PATH,
+ "catalogSyncConfig",
+ true,
+ "The path to a yaml file containing source and target tables
catalog configurations along with the table identifiers that need to synced")
+ .addOption(
+ HADOOP_CONFIG_PATH,
+ "hadoopConfig",
+ true,
+ "Hadoop config xml file path containing configs necessary to
access the "
+ + "file system. These configs will override the default
configs.")
+ .addOption(
+ CONVERTERS_CONFIG_PATH,
+ "convertersConfig",
+ true,
+ "The path to a yaml file containing InternalTable converter
configurations. "
+ + "These configs will override the default")
+ .addOption(HELP_OPTION, "help", false, "Displays help information to
run this utility");
+
+ public static void main(String[] args) throws Exception {
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd;
+ try {
+ cmd = parser.parse(OPTIONS, args);
+ } catch (ParseException e) {
+ new HelpFormatter().printHelp("xtable.jar", OPTIONS, true);
+ return;
+ }
+
+ if (cmd.hasOption(HELP_OPTION)) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("RunCatalogSync", OPTIONS);
+ return;
+ }
+
+ DatasetConfig datasetConfig;
+ try (InputStream inputStream =
+ Files.newInputStream(
+
Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) {
+ datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
+ }
+
+ byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
+ Configuration hadoopConf = loadHadoopConf(customConfig);
+
+ customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
+ TableFormatConverters tableFormatConverters =
loadTableFormatConversionConfigs(customConfig);
+
+ Map<String, ExternalCatalogConfig> catalogsById =
+ datasetConfig.getTargetCatalogs().stream()
+ .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId,
Function.identity()));
+ Optional<CatalogConversionSource> catalogConversionSource =
+ getCatalogConversionSource(datasetConfig.getSourceCatalog(),
hadoopConf);
+ ConversionController conversionController = new
ConversionController(hadoopConf);
+ for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) {
+ SourceTable sourceTable =
+ getSourceTable(dataset.getSourceCatalogTableIdentifier(),
catalogConversionSource);
+ List<TargetTable> targetTables = new ArrayList<>();
+ Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs = new
HashMap<>();
+ for (TargetTableIdentifier targetCatalogTableIdentifier :
+ dataset.getTargetCatalogTableIdentifiers()) {
+ TargetTable targetTable =
+ TargetTable.builder()
+ .name(sourceTable.getName())
+ .basePath(sourceTable.getBasePath())
+ .namespace(sourceTable.getNamespace())
+ .formatName(targetCatalogTableIdentifier.getTableFormat())
+ .build();
+ targetTables.add(targetTable);
+ if (!targetCatalogs.containsKey(targetTable)) {
+ targetCatalogs.put(targetTable, new ArrayList<>());
+ }
+ targetCatalogs
+ .get(targetTable)
+ .add(
+ TargetCatalogConfig.builder()
+ .catalogTableIdentifier(
+ getCatalogTableIdentifier(
+ targetCatalogTableIdentifier.getTableIdentifier()))
+
.catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId()))
+ .build());
+ }
+ ConversionConfig conversionConfig =
+ ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
+ .targetCatalogs(targetCatalogs)
+ .syncMode(SyncMode.INCREMENTAL)
+ .build();
+ List<String> tableFormats =
+ Stream.concat(
+ Stream.of(sourceTable.getFormatName()),
+ targetTables.stream().map(TargetTable::getFormatName))
+ .distinct()
+ .collect(Collectors.toList());
+ try {
+ conversionController.syncTableAcrossCatalogs(
+ conversionConfig,
+ getConversionSourceProviders(tableFormats, tableFormatConverters,
hadoopConf));
+ } catch (Exception e) {
+ log.error("Error running sync for {}", sourceTable.getBasePath(), e);
+ }
+ }
+ }
+
+ static Optional<CatalogConversionSource> getCatalogConversionSource(
+ ExternalCatalogConfig sourceCatalog, Configuration hadoopConf) {
+ if (CatalogType.STORAGE.equals(sourceCatalog.getCatalogType())) {
+ return Optional.empty();
+ }
+ return Optional.of(
+ CatalogConversionFactory.createCatalogConversionSource(sourceCatalog,
hadoopConf));
+ }
+
+ static SourceTable getSourceTable(
+ DatasetConfig.SourceTableIdentifier sourceTableIdentifier,
+ Optional<CatalogConversionSource> catalogConversionSource) {
+ SourceTable sourceTable = null;
+ if (sourceTableIdentifier.getStorageIdentifier() != null) {
+ StorageIdentifier storageIdentifier =
sourceTableIdentifier.getStorageIdentifier();
+ Properties sourceProperties = new Properties();
+ if (storageIdentifier.getPartitionSpec() != null) {
+ sourceProperties.put(
+ HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
storageIdentifier.getPartitionSpec());
+ }
+ sourceTable =
+ SourceTable.builder()
+ .name(storageIdentifier.getTableName())
+ .basePath(storageIdentifier.getTableBasePath())
+ .namespace(
+ storageIdentifier.getNamespace() == null
+ ? null
+ : storageIdentifier.getNamespace().split("\\."))
+ .dataPath(storageIdentifier.getTableDataPath())
+ .formatName(storageIdentifier.getTableFormat())
+ .additionalProperties(sourceProperties)
+ .build();
+ } else if (catalogConversionSource.isPresent()) {
+ sourceTable =
+ catalogConversionSource
+ .get()
+ .getSourceTable(
+
getCatalogTableIdentifier(sourceTableIdentifier.getTableIdentifier()));
+ }
+ return sourceTable;
+ }
+
+ static Map<String, ConversionSourceProvider> getConversionSourceProviders(
+ List<String> tableFormats,
+ TableFormatConverters tableFormatConverters,
+ Configuration hadoopConf) {
+ for (String tableFormat : tableFormats) {
+ if (CONVERSION_SOURCE_PROVIDERS.containsKey(tableFormat)) {
+ continue;
+ }
+ TableFormatConverters.ConversionConfig sourceConversionConfig =
+ tableFormatConverters.getTableFormatConverters().get(tableFormat);
+ if (sourceConversionConfig == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Source format %s is not supported. Known source and target
formats are %s",
+ tableFormat,
tableFormatConverters.getTableFormatConverters().keySet()));
+ }
+ String sourceProviderClass =
sourceConversionConfig.conversionSourceProviderClass;
+ ConversionSourceProvider<?> conversionSourceProvider =
+ ReflectionUtils.createInstanceOfClass(sourceProviderClass);
+ conversionSourceProvider.init(hadoopConf);
+ CONVERSION_SOURCE_PROVIDERS.put(tableFormat, conversionSourceProvider);
+ }
+ return CONVERSION_SOURCE_PROVIDERS;
+ }
+
+ /**
+ * Returns an implementation class for {@link CatalogTableIdentifier} based
on the tableIdentifier
+ * provided by user.
+ */
+ static CatalogTableIdentifier getCatalogTableIdentifier(TableIdentifier
tableIdentifier) {
+ if (tableIdentifier.getHierarchicalId() != null) {
+ return ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+ tableIdentifier.getHierarchicalId());
+ }
+ throw new IllegalArgumentException("Invalid tableIdentifier configuration
provided");
+ }
+
+ @Value
+ @Builder
+ @Jacksonized
+ public static class DatasetConfig {
+ /**
+ * Configuration of the source catalog from which XTable will read. It
must contain all the
+ * necessary connection and access details for describing and listing
tables
+ */
+ ExternalCatalogConfig sourceCatalog;
+ /**
+ * Defines configuration one or more target catalogs, to which XTable will
write or update
+ * tables. Unlike the source, these catalogs must be writable
+ */
+ List<ExternalCatalogConfig> targetCatalogs;
+ /** A list of datasets that specify how a source table maps to one or more
target tables. */
+ List<Dataset> datasets;
+
+ /** Configuration for catalog. */
+ ExternalCatalogConfig catalogConfig;
+
+ @Value
+ @Builder
+ @Jacksonized
+ public static class Dataset {
+ /** Identifies the source table in sourceCatalog. */
+ SourceTableIdentifier sourceCatalogTableIdentifier;
+ /** A list of one or more targets that this source table should be
written to. */
+ List<TargetTableIdentifier> targetCatalogTableIdentifiers;
+ }
+
+ @Value
+ @Builder
+ @Jacksonized
+ public static class SourceTableIdentifier {
+ /** Specifies the table identifier in the source catalog. */
+ TableIdentifier tableIdentifier;
+ /**
+ * (Optional) Provides direct storage details such as a table’s base
path (like an S3
+ * location) and the partition specification. This allows reading from a
source even if it is
+ * not strictly registered in a catalog, as long as the format and
location are known
+ */
+ StorageIdentifier storageIdentifier;
+ }
+
+ @Value
+ @Builder
+ @Jacksonized
+ public static class TargetTableIdentifier {
+ /**
+ * The user defined unique identifier of the target catalog where the
table will be created or
+ * updated
+ */
+ String catalogId;
+ /**
+ * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how
the data will be
+ * stored at the target.
+ */
+ String tableFormat;
+ /** Specifies the table identifier in the target catalog. */
+ TableIdentifier tableIdentifier;
+ }
+
+ @Value
+ @Builder
+ @Jacksonized
+ public static class TableIdentifier {
+ /**
+ * Specifics the three level hierarchical table identifier for {@link
+ * HierarchicalTableIdentifier}
+ */
+ String hierarchicalId;
+ }
+
+ /**
+ * Configuration in storage for table. This is an optional field in {@link
+ * SourceTableIdentifier}.
+ */
+ @Value
+ @Builder
+ @Jacksonized
+ public static class StorageIdentifier {
+ String tableFormat;
+ String tableBasePath;
+ String tableDataPath;
+ String tableName;
+ String partitionSpec;
+ String namespace;
+ }
+ }
+}
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index c84753de..1a7bda87 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -190,12 +190,12 @@ public class RunSync {
try {
conversionController.sync(conversionConfig, conversionSourceProvider);
} catch (Exception e) {
- log.error(String.format("Error running sync for %s",
table.getTableBasePath()), e);
+ log.error("Error running sync for {}", table.getTableBasePath(), e);
}
}
}
- private static byte[] getCustomConfigurations(CommandLine cmd, String
option) throws IOException {
+ static byte[] getCustomConfigurations(CommandLine cmd, String option) throws
IOException {
byte[] customConfig = null;
if (cmd.hasOption(option)) {
customConfig = Files.readAllBytes(Paths.get(cmd.getOptionValue(option)));
diff --git
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
new file mode 100644
index 00000000..52cff85a
--- /dev/null
+++
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.utilities;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import lombok.SneakyThrows;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestJavaHudiTable;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.testutil.ITTestUtils;
+import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig;
+import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.SourceTableIdentifier;
+import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier;
+import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TableIdentifier;
+import
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier;
+
+public class ITRunCatalogSync {
+
+ private static final List<String> EXPECTED_FUNCTION_CALLS =
+ Arrays.asList(
+ "hasDatabase",
+ "createDatabase",
+ "getTable",
+ "getStorageLocation",
+ "createTable",
+ "getCatalogId");
+
+ @Test
+ void testCatalogSync(@TempDir Path tempDir) throws Exception {
+ String tableName = "test-table";
+ try (GenericTable table =
+ TestJavaHudiTable.forStandardSchema(
+ tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
+ table.insertRows(20);
+ File configFile = writeConfigFile(tempDir, table, tableName);
+ String[] args = new String[] {"-catalogConfig", configFile.getPath()};
+ RunCatalogSync.main(args);
+ validateTargetMetadataIsPresent(table.getBasePath());
+ Map<String, Integer> functionCalls =
ITTestUtils.TestCatalogSyncImpl.getFunctionCalls();
+ EXPECTED_FUNCTION_CALLS.forEach(
+ (function -> Assertions.assertEquals(2,
functionCalls.get(function))));
+ }
+ }
+
+ private static File writeConfigFile(Path tempDir, GenericTable table, String
tableName)
+ throws IOException {
+ DatasetConfig config =
+ DatasetConfig.builder()
+ .sourceCatalog(
+ ExternalCatalogConfig.builder()
+ .catalogId("source-catalog-1")
+ .catalogType(CatalogType.STORAGE)
+ .build())
+ .targetCatalogs(
+ Collections.singletonList(
+ ExternalCatalogConfig.builder()
+ .catalogId("target-catalog-1")
+
.catalogSyncClientImpl(ITTestUtils.TestCatalogSyncImpl.class.getName())
+ .build()))
+ .datasets(
+ Collections.singletonList(
+ DatasetConfig.Dataset.builder()
+ .sourceCatalogTableIdentifier(
+ SourceTableIdentifier.builder()
+ .storageIdentifier(
+ StorageIdentifier.builder()
+ .tableBasePath(table.getBasePath())
+ .tableName(tableName)
+ .tableFormat("HUDI")
+ .build())
+ .build())
+ .targetCatalogTableIdentifiers(
+ Arrays.asList(
+ TargetTableIdentifier.builder()
+ .catalogId("target-catalog-1")
+ .tableFormat("DELTA")
+ .tableIdentifier(
+ TableIdentifier.builder()
+
.hierarchicalId("database-1.table-1")
+ .build())
+ .build(),
+ TargetTableIdentifier.builder()
+ .catalogId("target-catalog-1")
+ .tableFormat("ICEBERG")
+ .tableIdentifier(
+ TableIdentifier.builder()
+
.hierarchicalId("catalog-2.database-2.table-2")
+ .build())
+ .build()))
+ .build()))
+ .build();
+ File configFile = new File(tempDir + "config.yaml");
+ RunSync.YAML_MAPPER.writeValue(configFile, config);
+ return configFile;
+ }
+
+ @SneakyThrows
+ private void validateTargetMetadataIsPresent(String basePath) {
+ Path icebergMetadataPath = Paths.get(URI.create(basePath + "/metadata"));
+ long icebergMetadataFiles =
+ Files.list(icebergMetadataPath).filter(p ->
p.toString().endsWith("metadata.json")).count();
+ Assertions.assertEquals(2, icebergMetadataFiles);
+ Path deltaMetadataPath = Paths.get(URI.create(basePath + "/_delta_log"));
+ long deltaMetadataFiles =
+ Files.list(deltaMetadataPath).filter(p ->
p.toString().endsWith(".json")).count();
+ Assertions.assertEquals(1, deltaMetadataFiles);
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
similarity index 60%
copy from
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to
xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
index d5d7a3c5..3f504ff7 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
@@ -16,21 +16,23 @@
* limitations under the License.
*/
-package org.apache.xtable.iceberg;
+package org.apache.xtable.utilities;
-import java.util.Collections;
-import java.util.Map;
+import static org.junit.jupiter.api.Assertions.*;
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
+import lombok.SneakyThrows;
-import org.apache.xtable.conversion.CatalogConfig;
+import org.junit.jupiter.api.Test;
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
- @NonNull String catalogImpl;
- @NonNull String catalogName;
- @NonNull @Builder.Default Map<String, String> catalogOptions =
Collections.emptyMap();
+class TestRunCatalogSync {
+
+ @SneakyThrows
+ @Test
+ void testMain() {
+ String catalogConfigYamlPath =
+
TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath();
+ String[] args = {"-catalogConfig", catalogConfigYamlPath};
+ // Ensure yaml gets parsed without any errors.
+ assertDoesNotThrow(() -> RunCatalogSync.main(args));
+ }
}
diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml
b/xtable-utilities/src/test/resources/catalogConfig.yaml
new file mode 100644
index 00000000..05b2df4b
--- /dev/null
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+sourceCatalog:
+ catalogId: "source-1"
+ catalogConversionSourceImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogProperties:
+ key01: "value1"
+ key02: "value2"
+ key03: "value3"
+targetCatalogs:
+ - catalogId: "target-1"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogProperties:
+ key11: "value1"
+ key12: "value2"
+ key13: "value3"
+ - catalogId: "target-2"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogProperties:
+ key21: "value1"
+ key22: "value2"
+ key23: "value3"
+ - catalogId: "target-3"
+ catalogSyncClientImpl:
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+ catalogProperties:
+ key31: "value1"
+ key32: "value2"
+ key33: "value3"
+datasets:
+ - sourceCatalogTableIdentifier:
+ tableIdentifier:
+ hierarchicalId: "source-database-1.source-1"
+ targetCatalogTableIdentifiers:
+ - catalogId: "target-1"
+ tableFormat: "DELTA"
+ tableIdentifier:
+ hierarchicalId: "target-database-1.target-tableName-1"
+ - catalogId: "target-2"
+ tableFormat: "HUDI"
+ tableIdentifier:
+ hierarchicalId: "target-database-2.target-tableName-2-delta"
+ - sourceCatalogTableIdentifier:
+ storageIdentifier:
+ tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
+ tableName: catalog_sales
+ partitionSpec: cs_sold_date_sk:VALUE
+ tableFormat: "HUDI"
+ targetCatalogTableIdentifiers:
+ - catalogId: "target-2"
+ tableFormat: "ICEBERG"
+ tableIdentifier:
+ hierarchicalId: "target-database-2.target-tableName-2"
+ - catalogId: "target-3"
+ tableFormat: "HUDI"
+ tableIdentifier:
+ hierarchicalId:
"default-catalog-2.target-database-3.target-tableName-3"