This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch 590-CatalogSync
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit ea325d755b7c3dd6d82f38a9ebf44e2502f63293
Author: Vinish Reddy <[email protected]>
AuthorDate: Wed Dec 18 02:02:59 2024 -0800

    [590] Add RunCatalogSync utility for synchronizing tables across catalogs
---
 .../apache/xtable/conversion/ConversionConfig.java |  10 +-
 .../xtable/conversion/ExternalCatalogConfig.java   |  16 +-
 .../xtable/conversion/TargetCatalogConfig.java     |  24 +-
 .../org/apache/xtable/conversion/TargetTable.java  |   4 +
 .../xtable/catalog/CatalogConversionFactory.java   |  60 +++++
 .../ExternalCatalogConfigFactory.java}             |  26 +-
 .../xtable/conversion/ConversionController.java    | 199 ++++++++++----
 .../ConversionUtils.java}                          |  27 +-
 .../xtable/iceberg/IcebergCatalogConfig.java       |   8 +-
 .../org/apache/xtable/TestSparkDeltaTable.java     |   8 -
 .../catalog/TestCatalogConversionFactory.java      | 122 +++++++++
 .../conversion/TestConversionController.java       | 172 +++++++++++-
 .../apache/xtable/utilities/RunCatalogSync.java    | 292 +++++++++++++++++++++
 .../java/org/apache/xtable/utilities/RunSync.java  |   2 +-
 .../xtable/utilities/TestRunCatalogSync.java       |  99 +++++++
 .../src/test/resources/catalogConfig.yaml          |  75 ++++++
 16 files changed, 1040 insertions(+), 104 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
index 73e2628d..8ef52741 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
@@ -19,6 +19,7 @@
 package org.apache.xtable.conversion;
 
 import java.util.List;
+import java.util.Map;
 
 import lombok.Builder;
 import lombok.NonNull;
@@ -34,14 +35,21 @@ public class ConversionConfig {
   @NonNull SourceTable sourceTable;
   // One or more targets to sync the table metadata to
   List<TargetTable> targetTables;
+  // Each target table can be synced to multiple target catalogs, this is map 
from
+  // targetTableIdentifier to target catalogs.
+  Map<String, List<TargetCatalogConfig>> targetCatalogs;
   // The mode, incremental or snapshot
   SyncMode syncMode;
 
   @Builder
   ConversionConfig(
-      @NonNull SourceTable sourceTable, List<TargetTable> targetTables, 
SyncMode syncMode) {
+      @NonNull SourceTable sourceTable,
+      List<TargetTable> targetTables,
+      Map<String, List<TargetCatalogConfig>> targetCatalogs,
+      SyncMode syncMode) {
     this.sourceTable = sourceTable;
     this.targetTables = targetTables;
+    this.targetCatalogs = targetCatalogs;
     Preconditions.checkArgument(
         targetTables != null && !targetTables.isEmpty(),
         "Please provide at-least one format to sync");
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
similarity index 72%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
index d5d7a3c5..16785ec6 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
+package org.apache.xtable.conversion;
 
 import java.util.Collections;
 import java.util.Map;
@@ -25,12 +25,18 @@ import lombok.Builder;
 import lombok.NonNull;
 import lombok.Value;
 
-import org.apache.xtable.conversion.CatalogConfig;
-
+/** Defines the configuration for an external catalog. */
 @Value
 @Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
+public class ExternalCatalogConfig implements CatalogConfig {
+  /** The name of the catalog, it also acts as a unique identifier for each 
catalog */
   @NonNull String catalogName;
+
+  /** The implementation class path for the catalog */
+  @NonNull String catalogImpl;
+
+  /**
+   * The properties for each catalog, used for providing any custom behaviour 
during catalog sync
+   */
   @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
similarity index 62%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
index d5d7a3c5..ca6cec2d 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
@@ -16,21 +16,27 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
-
-import java.util.Collections;
-import java.util.Map;
+package org.apache.xtable.conversion;
 
 import lombok.Builder;
 import lombok.NonNull;
 import lombok.Value;
 
-import org.apache.xtable.conversion.CatalogConfig;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 
+/**
+ * TargetCatalogConfig contains the parameters that are required when syncing 
{@link TargetTable} to
+ * a catalog.
+ */
 @Value
 @Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
-  @NonNull String catalogName;
-  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
+public class TargetCatalogConfig {
+  /**
+   * The tableIdentifiers(databaseName, tableName) that will be used when 
syncing {@link
+   * TargetTable} to the catalog.
+   */
+  @NonNull CatalogTableIdentifier catalogTableIdentifier;
+
+  /** Configuration for the catalog. */
+  @NonNull ExternalCatalogConfig catalogConfig;
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
index 6256da2c..7f503b75 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
@@ -44,4 +44,8 @@ public class TargetTable extends ExternalTable {
     this.metadataRetention =
         metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : 
metadataRetention;
   }
+
+  public String getId() {
+    return String.format("%s#%s", sanitizeBasePath(this.basePath), formatName);
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
new file mode 100644
index 00000000..2a4e5d2e
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.catalog;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.reflection.ReflectionUtils;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+public class CatalogConversionFactory {
+  private static final CatalogConversionFactory INSTANCE = new 
CatalogConversionFactory();
+
+  public static CatalogConversionFactory getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Returns an implementation class for {@link CatalogConversionSource} 
that's used for converting
+   * table definitions in the catalog to {@link 
org.apache.xtable.conversion.SourceTable} object.
+   *
+   * @param sourceCatalogConfig configuration for the source catalog
+   * @param configuration hadoop configuration
+   */
+  public static CatalogConversionSource createCatalogConversionSource(
+      ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {
+    return ReflectionUtils.createInstanceOfClass(
+        sourceCatalogConfig.getCatalogImpl(), sourceCatalogConfig, 
configuration);
+  }
+
+  /**
+   * Returns an implementation class for {@link CatalogSyncClient} that's used 
for syncing {@link
+   * org.apache.xtable.conversion.TargetTable} to a catalog.
+   *
+   * @param targetCatalogConfig configuration for the target catalog
+   * @param configuration hadoop configuration
+   */
+  public CatalogSyncClient createCatalogSyncClient(
+      ExternalCatalogConfig targetCatalogConfig, Configuration configuration) {
+    return ReflectionUtils.createInstanceOfClass(
+        targetCatalogConfig.getCatalogImpl(), targetCatalogConfig, 
configuration);
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java
similarity index 57%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java
index d5d7a3c5..09bf1566 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
+package org.apache.xtable.catalog;
 
-import java.util.Collections;
 import java.util.Map;
 
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
 
-import org.apache.xtable.conversion.CatalogConfig;
+/** A factory class which returns {@link ExternalCatalogConfig} based on 
catalogType. */
+public class ExternalCatalogConfigFactory {
 
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
-  @NonNull String catalogName;
-  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
+  public static ExternalCatalogConfig fromCatalogType(
+      String catalogType, String catalogName, Map<String, String> properties) {
+    // TODO: Choose existing implementation based on catalogType.
+    String catalogImpl = "";
+    return ExternalCatalogConfig.builder()
+        .catalogImpl(catalogImpl)
+        .catalogName(catalogName)
+        .catalogOptions(properties)
+        .build();
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index 222652a6..df72ff5c 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -18,6 +18,8 @@
  
 package org.apache.xtable.conversion;
 
+import static 
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Collection;
@@ -37,15 +39,19 @@ import lombok.extern.log4j.Log4j2;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.xtable.catalog.CatalogConversionFactory;
 import org.apache.xtable.exception.ReadException;
 import org.apache.xtable.model.IncrementalTableChanges;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
 import org.apache.xtable.spi.extractor.ConversionSource;
 import org.apache.xtable.spi.extractor.ExtractFromSource;
+import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
 import org.apache.xtable.spi.sync.ConversionTarget;
 import org.apache.xtable.spi.sync.TableFormatSync;
 
@@ -64,10 +70,17 @@ import org.apache.xtable.spi.sync.TableFormatSync;
 public class ConversionController {
   private final Configuration conf;
   private final ConversionTargetFactory conversionTargetFactory;
+  private final CatalogConversionFactory catalogConversionFactory;
   private final TableFormatSync tableFormatSync;
+  private final CatalogSync catalogSync;
 
   public ConversionController(Configuration conf) {
-    this(conf, ConversionTargetFactory.getInstance(), 
TableFormatSync.getInstance());
+    this(
+        conf,
+        ConversionTargetFactory.getInstance(),
+        CatalogConversionFactory.getInstance(),
+        TableFormatSync.getInstance(),
+        CatalogSync.getInstance());
   }
 
   /**
@@ -89,57 +102,133 @@ public class ConversionController {
     try (ConversionSource<COMMIT> conversionSource =
         
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
+      return syncTableFormats(config, source, config.getSyncMode());
+    } catch (IOException ioException) {
+      throw new ReadException("Failed to close source converter", ioException);
+    }
+  }
 
-      Map<String, ConversionTarget> conversionTargetByFormat =
-          config.getTargetTables().stream()
-              .collect(
-                  Collectors.toMap(
-                      TargetTable::getFormatName,
-                      targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
-      // State for each TableFormat
-      Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
-          conversionTargetByFormat.entrySet().stream()
-              .collect(
-                  Collectors.toMap(
-                      Map.Entry::getKey, entry -> 
entry.getValue().getTableMetadata()));
-      Map<String, ConversionTarget> formatsToSyncIncrementally =
-          getFormatsToSyncIncrementally(
-              config,
-              conversionTargetByFormat,
-              lastSyncMetadataByFormat,
-              source.getConversionSource());
-      Map<String, ConversionTarget> formatsToSyncBySnapshot =
-          conversionTargetByFormat.entrySet().stream()
-              .filter(entry -> 
!formatsToSyncIncrementally.containsKey(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-      SyncResultForTableFormats syncResultForSnapshotSync =
-          formatsToSyncBySnapshot.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncSnapshot(formatsToSyncBySnapshot, source);
-      SyncResultForTableFormats syncResultForIncrementalSync =
-          formatsToSyncIncrementally.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncIncrementalChanges(
-                  formatsToSyncIncrementally, lastSyncMetadataByFormat, 
source);
-      Map<String, SyncResult> syncResultsMerged =
-          new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
-      syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
-      String successfulSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.SUCCESS);
-      if (!successfulSyncs.isEmpty()) {
-        log.info("Sync is successful for the following formats {}", 
successfulSyncs);
-      }
-      String failedSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.ERROR);
-      if (!failedSyncs.isEmpty()) {
-        log.error("Sync failed for the following formats {}", failedSyncs);
+  /**
+   * Synchronizes the source table in conversion config to multiple target 
catalogs. If the
+   * configuration for the target table uses a different table format, 
synchronizes the table format
+   * first before syncing it to target catalog
+   *
+   * @param config A per table level config containing source table, target 
tables, target catalogs
+   *     and syncMode.
+   * @param conversionSourceProvider A provider for the {@link 
ConversionSource} instance for each
+   *     tableFormat, {@link ConversionSourceProvider#init(Configuration)} 
must be called before
+   *     calling this method.
+   * @return Returns a map containing the table format, and it's sync result. 
Run sync for a table *
+   *     with the provided per table level configuration.
+   */
+  public Map<String, SyncResult> syncTableAcrossCatalogs(
+      ConversionConfig config, Map<String, ConversionSourceProvider> 
conversionSourceProvider) {
+    if (config.getTargetTables() == null || 
config.getTargetTables().isEmpty()) {
+      throw new IllegalArgumentException("Please provide at-least one format 
to sync");
+    }
+    try (ConversionSource conversionSource =
+        conversionSourceProvider
+            .get(config.getSourceTable().getFormatName())
+            .getConversionSourceInstance(config.getSourceTable())) {
+      ExtractFromSource source = ExtractFromSource.of(conversionSource);
+      Map<String, SyncResult> tableFormatSyncResults =
+          syncTableFormats(config, source, config.getSyncMode());
+      Map<String, SyncResult> catalogSyncResults = new HashMap<>();
+      for (TargetTable targetTable : config.getTargetTables()) {
+        Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
+            config.getTargetCatalogs().get(targetTable.getId()).stream()
+                .collect(
+                    Collectors.toMap(
+                        TargetCatalogConfig::getCatalogTableIdentifier,
+                        targetCatalog ->
+                            catalogConversionFactory.createCatalogSyncClient(
+                                targetCatalog.getCatalogConfig(), conf)));
+        catalogSyncResults.put(
+            targetTable.getFormatName(),
+            syncCatalogsForTargetTable(
+                targetTable,
+                catalogSyncClients,
+                conversionSourceProvider.get(targetTable.getFormatName())));
       }
-      return syncResultsMerged;
+      mergeSyncResults(tableFormatSyncResults, catalogSyncResults);
+      return tableFormatSyncResults;
     } catch (IOException ioException) {
       throw new ReadException("Failed to close source converter", ioException);
     }
   }
 
+  private <COMMIT> Map<String, SyncResult> syncTableFormats(
+      ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode 
syncMode) {
+    Map<String, ConversionTarget> conversionTargetByFormat =
+        config.getTargetTables().stream()
+            .filter(
+                targetTable ->
+                    
!targetTable.getFormatName().equals(config.getSourceTable().getFormatName()))
+            .collect(
+                Collectors.toMap(
+                    TargetTable::getFormatName,
+                    targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
+
+    Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
+        conversionTargetByFormat.entrySet().stream()
+            .collect(
+                Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().getTableMetadata()));
+    Map<String, ConversionTarget> formatsToSyncIncrementally =
+        getFormatsToSyncIncrementally(
+            syncMode,
+            conversionTargetByFormat,
+            lastSyncMetadataByFormat,
+            source.getConversionSource());
+    Map<String, ConversionTarget> formatsToSyncBySnapshot =
+        conversionTargetByFormat.entrySet().stream()
+            .filter(entry -> 
!formatsToSyncIncrementally.containsKey(entry.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    SyncResultForTableFormats syncResultForSnapshotSync =
+        formatsToSyncBySnapshot.isEmpty()
+            ? SyncResultForTableFormats.builder().build()
+            : syncSnapshot(formatsToSyncBySnapshot, source);
+    SyncResultForTableFormats syncResultForIncrementalSync =
+        formatsToSyncIncrementally.isEmpty()
+            ? SyncResultForTableFormats.builder().build()
+            : syncIncrementalChanges(formatsToSyncIncrementally, 
lastSyncMetadataByFormat, source);
+    Map<String, SyncResult> syncResultsMerged =
+        new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
+    syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
+    String successfulSyncs =
+        getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.SUCCESS);
+    if (!successfulSyncs.isEmpty()) {
+      log.info("Sync is successful for the following formats {}", 
successfulSyncs);
+    }
+    String failedSyncs =
+        getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.ERROR);
+    if (!failedSyncs.isEmpty()) {
+      log.error("Sync failed for the following formats {}", failedSyncs);
+    }
+    return syncResultsMerged;
+  }
+
+  /**
+   * Synchronizes the target table to multiple target catalogs.
+   *
+   * @param targetTable target table that needs to synced.
+   * @param catalogSyncClients Collection of catalog sync clients along with 
their table identifiers
+   *     for each target catalog.
+   * @param conversionSourceProvider A provider for the {@link 
ConversionSource} instance for the
+   *     table format of targetTable.
+   */
+  private SyncResult syncCatalogsForTargetTable(
+      TargetTable targetTable,
+      Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients,
+      ConversionSourceProvider conversionSourceProvider) {
+    return catalogSync.syncTable(
+        catalogSyncClients,
+        // We get the latest state of InternalTable for TargetTable
+        // and then synchronize it to catalogSyncClients.
+        conversionSourceProvider
+            .getConversionSourceInstance(convertToSourceTable(targetTable))
+            .getCurrentTable());
+  }
+
   private static String getFormatsWithStatusCode(
       Map<String, SyncResult> syncResultsMerged, SyncResult.SyncStatusCode 
statusCode) {
     return syncResultsMerged.entrySet().stream()
@@ -149,11 +238,11 @@ public class ConversionController {
   }
 
   private <COMMIT> Map<String, ConversionTarget> getFormatsToSyncIncrementally(
-      ConversionConfig conversionConfig,
+      SyncMode syncMode,
       Map<String, ConversionTarget> conversionTargetByFormat,
       Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat,
       ConversionSource<COMMIT> conversionSource) {
-    if (conversionConfig.getSyncMode() == SyncMode.FULL) {
+    if (syncMode == SyncMode.FULL) {
       // Full sync requested by config, hence no incremental sync.
       return Collections.emptyMap();
     }
@@ -268,6 +357,22 @@ public class ConversionController {
         .build();
   }
 
+  private void mergeSyncResults(
+      Map<String, SyncResult> syncResultsMerged, Map<String, SyncResult> 
catalogSyncResults) {
+    catalogSyncResults.forEach(
+        (tableFormat, catalogSyncResult) -> {
+          syncResultsMerged.computeIfPresent(
+              tableFormat,
+              (k, syncResult) ->
+                  syncResult.toBuilder()
+                      .syncDuration(
+                          
syncResult.getSyncDuration().plus(catalogSyncResult.getSyncDuration()))
+                      
.catalogSyncStatusList(catalogSyncResult.getCatalogSyncStatusList())
+                      .build());
+          syncResultsMerged.computeIfAbsent(tableFormat, k -> 
catalogSyncResult);
+        });
+  }
+
   @Value
   @Builder
   private static class SyncResultForTableFormats {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
similarity index 65%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
index d5d7a3c5..f21be670 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
+package org.apache.xtable.conversion;
 
-import java.util.Collections;
-import java.util.Map;
+public class ConversionUtils {
 
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
-
-import org.apache.xtable.conversion.CatalogConfig;
-
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
-  @NonNull String catalogName;
-  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
+  public static SourceTable convertToSourceTable(TargetTable table) {
+    return new SourceTable(
+        table.getName(),
+        table.getFormatName(),
+        table.getBasePath(),
+        table.getBasePath(),
+        table.getNamespace(),
+        table.getCatalogConfig(),
+        table.getAdditionalProperties());
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
index d5d7a3c5..b678bf00 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
@@ -27,10 +27,16 @@ import lombok.Value;
 
 import org.apache.xtable.conversion.CatalogConfig;
 
+/**
+ * Iceberg requires a catalog to perform any operation, if no catalog is 
provided the default
+ * catalog (HadoopCatalog or storage based catalog) is used. For syncing 
iceberg to multiple
+ * catalogs, you can use {@link 
org.apache.xtable.catalog.ExternalCatalogConfig} instead which
+ * allows syncing the latest version of iceberg metadata to multiple catalogs.
+ */
 @Value
 @Builder
 public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
   @NonNull String catalogName;
+  @NonNull String catalogImpl;
   @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
index ee5b1ccd..a458070b 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
@@ -134,15 +134,7 @@ public class TestSparkDeltaTable implements 
GenericTable<Row, Object>, Closeable
   }
 
   @SneakyThrows
-  @Override
   public void deleteRows(List<Row> deleteRows) {
-    String idsToDelete =
-        deleteRows.stream().map(row -> 
row.get(0).toString()).collect(Collectors.joining(", "));
-    deltaTable.delete("id in (" + idsToDelete + ")");
-  }
-
-  @SneakyThrows
-  public void mergeDeleteRows(List<Row> deleteRows) {
     List<Row> deletes = 
testDeltaHelper.transformForUpsertsOrDeletes(deleteRows, false);
     Dataset<Row> deleteDataset =
         sparkSession.createDataFrame(deletes, 
testDeltaHelper.getTableStructSchema());
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
 
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
new file mode 100644
index 00000000..5fd6523f
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetCatalogConfig;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+class TestCatalogConversionFactory {
+
+  @Test
+  void createSourceForConfig() {
+    ExternalCatalogConfig sourceCatalog =
+        ExternalCatalogConfig.builder()
+            .catalogName("catalogName")
+            .catalogImpl(TestCatalogImpl.class.getName())
+            .catalogOptions(Collections.emptyMap())
+            .build();
+    CatalogConversionSource catalogConversionSource =
+        CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, 
new Configuration());
+    assertEquals(catalogConversionSource.getClass().getName(), 
TestCatalogImpl.class.getName());
+  }
+
+  @Test
+  void createForCatalog() {
+    TargetCatalogConfig targetCatalogConfig =
+        TargetCatalogConfig.builder()
+            .catalogConfig(
+                ExternalCatalogConfig.builder()
+                    .catalogName("catalogName")
+                    .catalogImpl(TestCatalogImpl.class.getName())
+                    .catalogOptions(Collections.emptyMap())
+                    .build())
+            .catalogTableIdentifier(
+                CatalogTableIdentifier.builder()
+                    .databaseName("target-database")
+                    .tableName("target-tableName")
+                    .build())
+            .build();
+    CatalogSyncClient catalogSyncClient =
+        CatalogConversionFactory.getInstance()
+            .createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), 
new Configuration());
+    assertEquals(catalogSyncClient.getClass().getName(), 
TestCatalogImpl.class.getName());
+  }
+
+  public static class TestCatalogImpl
+      implements CatalogSyncClient<Object>, CatalogConversionSource {
+
+    public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration 
hadoopConf) {}
+
+    @Override
+    public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
+      return null;
+    }
+
+    @Override
+    public String getCatalogName() {
+      return null;
+    }
+
+    @Override
+    public String getStorageDescriptorLocation(Object o) {
+      return null;
+    }
+
+    @Override
+    public boolean hasDatabase(String databaseName) {
+      return false;
+    }
+
+    @Override
+    public void createDatabase(String databaseName) {}
+
+    @Override
+    public Object getTable(CatalogTableIdentifier tableIdentifier) {
+      return null;
+    }
+
+    @Override
+    public void createTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void refreshTable(
+        InternalTable table, Object catalogTable, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void createOrReplaceTable(InternalTable table, 
CatalogTableIdentifier tableIdentifier) {}
+
+    @Override
+    public void dropTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void close() throws Exception {}
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index caba8046..f7635d91 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -18,8 +18,12 @@
  
 package org.apache.xtable.conversion;
 
+import static 
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
+import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
 import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -43,17 +47,23 @@ import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatcher;
 
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.xtable.catalog.CatalogConversionFactory;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.IncrementalTableChanges;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
 import org.apache.xtable.spi.extractor.ConversionSource;
+import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
 import org.apache.xtable.spi.sync.ConversionTarget;
 import org.apache.xtable.spi.sync.TableFormatSync;
 
@@ -62,12 +72,22 @@ public class TestConversionController {
   private final Configuration mockConf = mock(Configuration.class);
   private final ConversionSourceProvider<Instant> mockConversionSourceProvider 
=
       mock(ConversionSourceProvider.class);
+  private final ConversionSourceProvider<Instant> 
mockConversionSourceProvider2 =
+      mock(ConversionSourceProvider.class);
+  private final ConversionSourceProvider<Instant> 
mockConversionSourceProvider3 =
+      mock(ConversionSourceProvider.class);
+
   private final ConversionSource<Instant> mockConversionSource = 
mock(ConversionSource.class);
   private final ConversionTargetFactory mockConversionTargetFactory =
       mock(ConversionTargetFactory.class);
+  private final CatalogConversionFactory mockCatalogConversionFactory =
+      mock(CatalogConversionFactory.class);
   private final TableFormatSync tableFormatSync = mock(TableFormatSync.class);
+  private final CatalogSync catalogSync = mock(CatalogSync.class);
   private final ConversionTarget mockConversionTarget1 = 
mock(ConversionTarget.class);
   private final ConversionTarget mockConversionTarget2 = 
mock(ConversionTarget.class);
+  private final CatalogSyncClient mockCatalogSyncClient1 = 
mock(CatalogSyncClient.class);
+  private final CatalogSyncClient mockCatalogSyncClient2 = 
mock(CatalogSyncClient.class);
 
   @Test
   void testAllSnapshotSyncAsPerConfig() {
@@ -96,7 +116,12 @@ public class TestConversionController {
             eq(internalSnapshot)))
         .thenReturn(perTableResults);
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(perTableResults, result);
@@ -182,7 +207,12 @@ public class TestConversionController {
     expectedSyncResult.put(TableFormat.ICEBERG, 
getLastSyncResult(icebergSyncResults));
     expectedSyncResult.put(TableFormat.DELTA, 
getLastSyncResult(deltaSyncResults));
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
@@ -226,7 +256,12 @@ public class TestConversionController {
             eq(internalSnapshot)))
         .thenReturn(syncResults);
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(syncResults, result);
@@ -310,7 +345,12 @@ public class TestConversionController {
     expectedSyncResult.put(TableFormat.ICEBERG, syncResult);
     expectedSyncResult.put(TableFormat.DELTA, 
getLastSyncResult(deltaSyncResults));
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
@@ -368,16 +408,101 @@ public class TestConversionController {
     // Iceberg and Delta have no commits to sync
     Map<String, SyncResult> expectedSyncResult = Collections.emptyMap();
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
   }
 
+  @Test
+  void testNoTableFormatConversionWithMultipleCatalogSync() {
+    SyncMode syncMode = SyncMode.INCREMENTAL;
+    List<TargetCatalogConfig> targetCatalogs =
+        Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2"));
+    InternalTable internalTable = getInternalTable();
+    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+    // Conversion source and target mocks.
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(
+            Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode, 
targetCatalogs);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
+        .thenReturn(mockConversionSource);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            convertToSourceTable(conversionConfig.getTargetTables().get(0))))
+        .thenReturn(mockConversionSource);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            convertToSourceTable(conversionConfig.getTargetTables().get(1))))
+        .thenReturn(mockConversionSource);
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
+        .thenReturn(mockConversionTarget1);
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
+        .thenReturn(mockConversionTarget2);
+    
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
+    
when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable());
+    // Mocks for tableFormatSync.
+    Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
+    SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour, 
Duration.ofSeconds(1));
+    Map<String, SyncResult> tableFormatSyncResults =
+        buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult);
+    when(tableFormatSync.syncSnapshot(
+            argThat(containsAll(Arrays.asList(mockConversionTarget1, 
mockConversionTarget2))),
+            eq(internalSnapshot)))
+        .thenReturn(tableFormatSyncResults);
+    // Mocks for catalogSync.
+    when(mockCatalogConversionFactory.createCatalogSyncClient(
+            targetCatalogs.get(0).getCatalogConfig(), mockConf))
+        .thenReturn(mockCatalogSyncClient1);
+    when(mockCatalogConversionFactory.createCatalogSyncClient(
+            targetCatalogs.get(1).getCatalogConfig(), mockConf))
+        .thenReturn(mockCatalogSyncClient2);
+    when(catalogSync.syncTable(
+            eq(
+                ImmutableMap.of(
+                    targetCatalogs.get(0).getCatalogTableIdentifier(), 
mockCatalogSyncClient1,
+                    targetCatalogs.get(1).getCatalogTableIdentifier(), 
mockCatalogSyncClient2)),
+            any()))
+        .thenReturn(buildSyncResult(syncMode, Instant.now(), 
Duration.ofSeconds(3)));
+    ConversionController conversionController =
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
+    // Mocks for conversionSourceProviders.
+    Map<String, ConversionSourceProvider> conversionSourceProviders = new 
HashMap<>();
+    conversionSourceProviders.put(HUDI, mockConversionSourceProvider);
+    conversionSourceProviders.put(ICEBERG, mockConversionSourceProvider);
+    conversionSourceProviders.put(DELTA, mockConversionSourceProvider);
+    // Assert results.
+    Map<String, SyncResult> mergedSyncResults =
+        buildPerTableResult(
+            Arrays.asList(ICEBERG, DELTA),
+            
syncResult.toBuilder().syncDuration(Duration.ofSeconds(4)).build());
+    Map<String, SyncResult> result =
+        conversionController.syncTableAcrossCatalogs(conversionConfig, 
conversionSourceProviders);
+    assertEquals(mergedSyncResults, result);
+  }
+
   private SyncResult getLastSyncResult(List<SyncResult> syncResults) {
     return syncResults.get(syncResults.size() - 1);
   }
 
+  private Map<String, SyncResult> buildPerTableResult(
+      List<String> tableFormats, SyncResult syncResult) {
+    Map<String, SyncResult> perTableResults = new HashMap<>();
+    tableFormats.forEach(tableFormat -> perTableResults.put(tableFormat, 
syncResult));
+    return perTableResults;
+  }
+
   private List<SyncResult> buildSyncResults(List<Instant> instantList) {
     return instantList.stream()
         .map(instant -> buildSyncResult(SyncMode.INCREMENTAL, instant))
@@ -396,6 +521,17 @@ public class TestConversionController {
         .build();
   }
 
+  private SyncResult buildSyncResult(
+      SyncMode syncMode, Instant lastSyncedInstant, Duration duration) {
+    return SyncResult.builder()
+        .mode(syncMode)
+        .lastInstantSynced(lastSyncedInstant)
+        .syncStartTime(Instant.now().minusSeconds(duration.getSeconds()))
+        .syncDuration(duration)
+        .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
+        .build();
+  }
+
   private InternalSnapshot buildSnapshot(InternalTable internalTable, String 
version) {
     return 
InternalSnapshot.builder().table(internalTable).version(version).build();
   }
@@ -413,6 +549,13 @@ public class TestConversionController {
   }
 
   private ConversionConfig getTableSyncConfig(List<String> targetTableFormats, 
SyncMode syncMode) {
+    return getTableSyncConfig(targetTableFormats, syncMode, 
Collections.emptyList());
+  }
+
+  private ConversionConfig getTableSyncConfig(
+      List<String> targetTableFormats,
+      SyncMode syncMode,
+      List<TargetCatalogConfig> targetCatalogs) {
     SourceTable sourceTable =
         SourceTable.builder()
             .name("tablename")
@@ -434,10 +577,29 @@ public class TestConversionController {
     return ConversionConfig.builder()
         .sourceTable(sourceTable)
         .targetTables(targetTables)
+        .targetCatalogs(
+            targetTables.stream()
+                .collect(Collectors.toMap(TargetTable::getId, k -> 
targetCatalogs)))
         .syncMode(syncMode)
         .build();
   }
 
+  private TargetCatalogConfig getTargetCatalog(String suffix) {
+    return TargetCatalogConfig.builder()
+        .catalogConfig(
+            ExternalCatalogConfig.builder()
+                .catalogName("catalogName-" + suffix)
+                .catalogImpl("catalogImpl-" + suffix)
+                .catalogOptions(Collections.emptyMap())
+                .build())
+        .catalogTableIdentifier(
+            CatalogTableIdentifier.builder()
+                .databaseName("target-database" + suffix)
+                .tableName("target-tableName" + suffix)
+                .build())
+        .build();
+  }
+
   private static <T> ArgumentMatcher<Collection<T>> containsAll(Collection<T> 
expected) {
     return actual -> actual.size() == expected.size() && 
actual.containsAll(expected);
   }
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
new file mode 100644
index 00000000..f69d75f5
--- /dev/null
+++ 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.utilities;
+
+import static org.apache.xtable.utilities.RunSync.getCustomConfigurations;
+import static org.apache.xtable.utilities.RunSync.loadHadoopConf;
+import static 
org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import org.apache.xtable.catalog.CatalogConversionFactory;
+import org.apache.xtable.catalog.ExternalCatalogConfigFactory;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetCatalogConfig;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.reflection.ReflectionUtils;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier;
+
+/**
+ * Provides standalone process for reading tables from a source catalog and 
synchronizing their
+ * state in target tables, supports table format conversion as well if the 
target table chooses a
+ * different format from source table.
+ */
+@Log4j2
+public class RunCatalogSync {
+  public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new 
YAMLFactory());
+  private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = 
"catalogConfig";
+  private static final String HADOOP_CONFIG_PATH = "hadoopConfig";
+  private static final String CONVERTERS_CONFIG_PATH = "convertersConfig";
+  private static final String HELP_OPTION = "h";
+  private static final Map<String, ConversionSourceProvider> 
CONVERSION_SOURCE_PROVIDERS =
+      new HashMap<>();
+
+  private static final Options OPTIONS =
+      new Options()
+          .addRequiredOption(
+              CATALOG_SOURCE_AND_TARGET_CONFIG_PATH,
+              "catalogSyncConfig",
+              true,
+              "The path to a yaml file containing source and target tables 
catalog configurations along with the table identifiers that need to synced")
+          .addOption(
+              HADOOP_CONFIG_PATH,
+              "hadoopConfig",
+              true,
+              "Hadoop config xml file path containing configs necessary to 
access the "
+                  + "file system. These configs will override the default 
configs.")
+          .addOption(
+              CONVERTERS_CONFIG_PATH,
+              "convertersConfig",
+              true,
+              "The path to a yaml file containing InternalTable converter 
configurations. "
+                  + "These configs will override the default")
+          .addOption(HELP_OPTION, "help", false, "Displays help information to 
run this utility");
+
+  public static void main(String[] args) throws Exception {
+    CommandLineParser parser = new DefaultParser();
+    CommandLine cmd;
+    try {
+      cmd = parser.parse(OPTIONS, args);
+    } catch (ParseException e) {
+      new HelpFormatter().printHelp("xtable.jar", OPTIONS, true);
+      return;
+    }
+
+    if (cmd.hasOption(HELP_OPTION)) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("RunCatalogSync", OPTIONS);
+      return;
+    }
+
+    DatasetConfig datasetConfig = new DatasetConfig();
+    try (InputStream inputStream =
+        Files.newInputStream(
+            
Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) {
+      ObjectReader objectReader = YAML_MAPPER.readerForUpdating(datasetConfig);
+      objectReader.readValue(inputStream);
+    }
+
+    byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
+    Configuration hadoopConf = loadHadoopConf(customConfig);
+
+    customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
+    RunSync.TableFormatConverters tableFormatConverters =
+        loadTableFormatConversionConfigs(customConfig);
+
+    Map<String, DatasetConfig.Catalog> catalogsByName =
+        datasetConfig.getTargetCatalogs().stream()
+            .collect(Collectors.toMap(DatasetConfig.Catalog::getCatalogName, 
Function.identity()));
+    ExternalCatalogConfig sourceCatalogConfig = 
getCatalogConfig(datasetConfig.getSourceCatalog());
+    CatalogConversionSource catalogConversionSource =
+        
CatalogConversionFactory.createCatalogConversionSource(sourceCatalogConfig, 
hadoopConf);
+    ConversionController conversionController = new 
ConversionController(hadoopConf);
+    for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) {
+      SourceTable sourceTable = null;
+      if (dataset.getSourceCatalogTableIdentifier().getStorageIdentifier() != 
null) {
+        StorageIdentifier storageIdentifier =
+            dataset.getSourceCatalogTableIdentifier().getStorageIdentifier();
+        sourceTable =
+            SourceTable.builder()
+                .name(storageIdentifier.getTableName())
+                .basePath(storageIdentifier.getTableBasePath())
+                .namespace(
+                    storageIdentifier.getNamespace() == null
+                        ? null
+                        : storageIdentifier.getNamespace().split("\\."))
+                .dataPath(storageIdentifier.getTableDataPath())
+                .formatName(storageIdentifier.getTableFormat())
+                .build();
+      } else {
+        sourceTable =
+            catalogConversionSource.getSourceTable(
+                
dataset.getSourceCatalogTableIdentifier().getCatalogTableIdentifier());
+      }
+      List<TargetTable> targetTables = new ArrayList<>();
+      Map<String, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>();
+      for (TargetTableIdentifier targetCatalogTableIdentifier :
+          dataset.getTargetCatalogTableIdentifiers()) {
+        TargetTable targetTable =
+            TargetTable.builder()
+                .name(sourceTable.getName())
+                .basePath(sourceTable.getBasePath())
+                .namespace(sourceTable.getNamespace())
+                .formatName(targetCatalogTableIdentifier.getTableFormat())
+                .build();
+        targetTables.add(targetTable);
+        if (!targetCatalogs.containsKey(targetTable.getId())) {
+          targetCatalogs.put(targetTable.getId(), new ArrayList<>());
+        }
+        targetCatalogs
+            .get(targetTable.getId())
+            .add(
+                TargetCatalogConfig.builder()
+                    .catalogTableIdentifier(
+                        
targetCatalogTableIdentifier.getCatalogTableIdentifier())
+                    .catalogConfig(
+                        getCatalogConfig(
+                            
catalogsByName.get(targetCatalogTableIdentifier.getCatalogName())))
+                    .build());
+      }
+      ConversionConfig conversionConfig =
+          ConversionConfig.builder()
+              .sourceTable(sourceTable)
+              .targetTables(targetTables)
+              .targetCatalogs(targetCatalogs)
+              .syncMode(SyncMode.INCREMENTAL)
+              .build();
+      List<String> tableFormats =
+          new ArrayList<>(Collections.singleton(sourceTable.getFormatName()));
+      tableFormats.addAll(
+          
targetTables.stream().map(TargetTable::getFormatName).collect(Collectors.toList()));
+      tableFormats = 
tableFormats.stream().distinct().collect(Collectors.toList());
+      try {
+        conversionController.syncTableAcrossCatalogs(
+            conversionConfig,
+            getConversionSourceProviders(tableFormats, tableFormatConverters, 
hadoopConf));
+      } catch (Exception e) {
+        log.error(String.format("Error running sync for %s", 
sourceTable.getBasePath()), e);
+      }
+    }
+  }
+
+  static ExternalCatalogConfig getCatalogConfig(DatasetConfig.Catalog catalog) 
{
+    if (!StringUtils.isEmpty(catalog.getCatalogType())) {
+      return ExternalCatalogConfigFactory.fromCatalogType(
+          catalog.getCatalogType(), catalog.getCatalogName(), 
catalog.getCatalogProperties());
+    } else {
+      return ExternalCatalogConfig.builder()
+          .catalogName(catalog.getCatalogName())
+          .catalogImpl(catalog.getCatalogImpl())
+          .catalogOptions(catalog.getCatalogProperties())
+          .build();
+    }
+  }
+
+  static Map<String, ConversionSourceProvider> getConversionSourceProviders(
+      List<String> tableFormats,
+      RunSync.TableFormatConverters tableFormatConverters,
+      Configuration hadoopConf) {
+    for (String tableFormat : tableFormats) {
+      if (CONVERSION_SOURCE_PROVIDERS.containsKey(tableFormat)) {
+        continue;
+      }
+      RunSync.TableFormatConverters.ConversionConfig sourceConversionConfig =
+          tableFormatConverters.getTableFormatConverters().get(tableFormat);
+      if (sourceConversionConfig == null) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Source format %s is not supported. Known source and target 
formats are %s",
+                tableFormat, 
tableFormatConverters.getTableFormatConverters().keySet()));
+      }
+      String sourceProviderClass = 
sourceConversionConfig.conversionSourceProviderClass;
+      ConversionSourceProvider<?> conversionSourceProvider =
+          ReflectionUtils.createInstanceOfClass(sourceProviderClass);
+      conversionSourceProvider.init(hadoopConf);
+      CONVERSION_SOURCE_PROVIDERS.put(tableFormat, conversionSourceProvider);
+    }
+    return CONVERSION_SOURCE_PROVIDERS;
+  }
+
+  @Data
+  public static class DatasetConfig {
+    private Catalog sourceCatalog;
+    private List<Catalog> targetCatalogs;
+    private List<Dataset> datasets;
+
+    @Data
+    public static class Catalog {
+      private String catalogName;
+      private String catalogType;
+      private String catalogImpl;
+      private Map<String, String> catalogProperties;
+    }
+
+    @Data
+    public static class StorageIdentifier {
+      String tableFormat;
+      String tableBasePath;
+      String tableDataPath;
+      String tableName;
+      String partitionSpec;
+      String namespace;
+    }
+
+    @Data
+    public static class SourceTableIdentifier {
+      CatalogTableIdentifier catalogTableIdentifier;
+      StorageIdentifier storageIdentifier;
+    }
+
+    @Data
+    public static class TargetTableIdentifier {
+      String catalogName;
+      String tableFormat;
+      CatalogTableIdentifier catalogTableIdentifier;
+    }
+
+    @Data
+    public static class Dataset {
+      private SourceTableIdentifier sourceCatalogTableIdentifier;
+      private List<TargetTableIdentifier> targetCatalogTableIdentifiers;
+    }
+  }
+}
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index c84753de..9475b829 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -195,7 +195,7 @@ public class RunSync {
     }
   }
 
-  private static byte[] getCustomConfigurations(CommandLine cmd, String 
option) throws IOException {
+  static byte[] getCustomConfigurations(CommandLine cmd, String option) throws 
IOException {
     byte[] customConfig = null;
     if (cmd.hasOption(option)) {
       customConfig = Files.readAllBytes(Paths.get(cmd.getOptionValue(option)));
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
new file mode 100644
index 00000000..7b77214f
--- /dev/null
+++ 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.utilities;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+class TestRunCatalogSync {
+
+  @SneakyThrows
+  @Test
+  void testMain() {
+    String catalogConfigYamlPath =
+        
TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath();
+    String[] args = {"-catalogConfig", catalogConfigYamlPath};
+    // Ensure yaml gets parsed and no op-sync implemented in TestCatalogImpl 
is called.
+    assertDoesNotThrow(() -> RunCatalogSync.main(args));
+  }
+
+  public static class TestCatalogImpl implements CatalogConversionSource, 
CatalogSyncClient {
+
+    public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration 
hadoopConf) {}
+
+    @Override
+    public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
+      return SourceTable.builder()
+          .name("source_table_name")
+          .basePath("file://base_path/v1/")
+          .formatName("ICEBERG")
+          .build();
+    }
+
+    @Override
+    public String getCatalogName() {
+      return null;
+    }
+
+    @Override
+    public String getStorageDescriptorLocation(Object o) {
+      return null;
+    }
+
+    @Override
+    public boolean hasDatabase(String databaseName) {
+      return false;
+    }
+
+    @Override
+    public void createDatabase(String databaseName) {}
+
+    @Override
+    public Object getTable(CatalogTableIdentifier tableIdentifier) {
+      return null;
+    }
+
+    @Override
+    public void createTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void refreshTable(
+        InternalTable table, Object catalogTable, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void createOrReplaceTable(InternalTable table, 
CatalogTableIdentifier tableIdentifier) {}
+
+    @Override
+    public void dropTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {}
+
+    @Override
+    public void close() throws Exception {}
+  }
+}
diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml 
b/xtable-utilities/src/test/resources/catalogConfig.yaml
new file mode 100644
index 00000000..6a4df0f0
--- /dev/null
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+sourceCatalog:
+  catalogName: "source-1"
+  catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
+  catalogProperties:
+    key01: "value1"
+    key02: "value2"
+    key03: "value3"
+targetCatalogs:
+  - catalogName: "target-1"
+    catalogImpl: 
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
+    catalogProperties:
+      key11: "value1"
+      key12: "value2"
+      key13: "value3"
+  - catalogName: "target-2"
+    catalogImpl: 
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
+    catalogProperties:
+      key21: "value1"
+      key22: "value2"
+      key23: "value3"
+  - catalogName: "target-3"
+    catalogImpl: 
"org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl"
+    catalogProperties:
+      key31: "value1"
+      key32: "value2"
+      key33: "value3"
+datasets:
+  - sourceCatalogTableIdentifier:
+      catalogTableIdentifier:
+        databaseName: "source-database-1"
+        tableName: "source-1"
+    targetCatalogTableIdentifiers:
+      - catalogName: "target-1"
+        tableFormat: "DELTA"
+        catalogTableIdentifier:
+          databaseName: "target-database-1"
+          tableName: "target-tableName-1"
+      - catalogName: "target-2"
+        tableFormat: "HUDI"
+        catalogTableIdentifier:
+          databaseName: "target-database-2"
+          tableName: "target-tableName-2-delta"
+  - sourceCatalogTableIdentifier:
+      storageIdentifier:
+        tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
+        tableName: catalog_sales
+        partitionSpec: cs_sold_date_sk:VALUE
+        tableFormat: "HUDI"
+    targetCatalogTableIdentifiers:
+      - catalogName: "target-2"
+        tableFormat: "ICEBERG"
+        catalogTableIdentifier:
+          databaseName: "target-database-2"
+          tableName: "target-tableName-2"
+      - catalogName: "target-3"
+        tableFormat: "HUDI"
+        catalogTableIdentifier:
+          databaseName: "target-database-3"
+          tableName: "target-tableName-3"

Reply via email to