This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fa812e3 [590] Add RunCatalogSync utility for synchronizing tables 
across catalogs
9fa812e3 is described below

commit 9fa812e32df3bba25cfa27398c10a7c9eb24e2e2
Author: Vinish Reddy <[email protected]>
AuthorDate: Mon Jan 27 11:15:03 2025 -0800

    [590] Add RunCatalogSync utility for synchronizing tables across catalogs
---
 .../apache/xtable/conversion/ConversionConfig.java |  12 +-
 .../xtable/conversion/ExternalCatalogConfig.java   |  65 ++++
 .../xtable/conversion/TargetCatalogConfig.java     |  21 +-
 .../apache/xtable/model/storage/CatalogType.java   |  24 +-
 .../spi/extractor/CatalogConversionSource.java     |   3 +
 .../apache/xtable/spi/sync/CatalogSyncClient.java  |   3 +
 xtable-core/pom.xml                                |  20 ++
 .../xtable/catalog/CatalogConversionFactory.java   |  96 ++++++
 .../xtable/conversion/ConversionController.java    | 212 +++++++++---
 .../ConversionUtils.java}                          |  27 +-
 .../xtable/iceberg/IcebergCatalogConfig.java       |   8 +-
 .../catalog/TestCatalogConversionFactory.java      | 108 ++++++
 .../conversion/TestConversionController.java       | 174 +++++++++-
 .../org/apache/xtable/testutil/ITTestUtils.java    | 108 ++++++
 ...he.xtable.spi.extractor.CatalogConversionSource |  18 +
 .../org.apache.xtable.spi.sync.CatalogSyncClient   |  18 +
 xtable-utilities/pom.xml                           |  23 ++
 .../apache/xtable/utilities/RunCatalogSync.java    | 365 +++++++++++++++++++++
 .../java/org/apache/xtable/utilities/RunSync.java  |   4 +-
 .../apache/xtable/utilities/ITRunCatalogSync.java  | 142 ++++++++
 .../xtable/utilities/TestRunCatalogSync.java       |  28 +-
 .../src/test/resources/catalogConfig.yaml          |  71 ++++
 22 files changed, 1441 insertions(+), 109 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
index 73e2628d..63e9d673 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
@@ -18,7 +18,9 @@
  
 package org.apache.xtable.conversion;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import lombok.Builder;
 import lombok.NonNull;
@@ -29,22 +31,30 @@ import com.google.common.base.Preconditions;
 import org.apache.xtable.model.sync.SyncMode;
 
 @Value
+@Builder
 public class ConversionConfig {
   // The source of the sync
   @NonNull SourceTable sourceTable;
   // One or more targets to sync the table metadata to
   List<TargetTable> targetTables;
+  // Each target table can be synced to multiple target catalogs, this is map 
from
+  // targetTable to target catalogs.
+  Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs;
   // The mode, incremental or snapshot
   SyncMode syncMode;
 
   @Builder
   ConversionConfig(
-      @NonNull SourceTable sourceTable, List<TargetTable> targetTables, 
SyncMode syncMode) {
+      @NonNull SourceTable sourceTable,
+      List<TargetTable> targetTables,
+      Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs,
+      SyncMode syncMode) {
     this.sourceTable = sourceTable;
     this.targetTables = targetTables;
     Preconditions.checkArgument(
         targetTables != null && !targetTables.isEmpty(),
         "Please provide at-least one format to sync");
+    this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() : 
targetCatalogs;
     this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
   }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
new file mode 100644
index 00000000..b525d831
--- /dev/null
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import java.util.Collections;
+import java.util.Map;
+
+import lombok.Builder;
+import lombok.NonNull;
+import lombok.Value;
+
+/**
+ * Defines the configuration for an external catalog, user needs to populate 
at-least one of {@link
+ * ExternalCatalogConfig#catalogType} or {@link 
ExternalCatalogConfig#catalogSyncClientImpl}
+ */
+@Value
+@Builder
+public class ExternalCatalogConfig {
+  /**
+   * A user-defined unique identifier for the catalog, allows user to sync 
table to multiple
+   * catalogs of the same name/type eg: HMS catalog with url1, HMS catalog 
with url2
+   */
+  @NonNull String catalogId;
+
+  /**
+   * The type of the catalog. If the catalogType implementation exists in 
XTable, the implementation
+   * class will be inferred.
+   */
+  String catalogType;
+
+  /**
+   * (Optional) A fully qualified class name that implements the interface for 
{@link
+   * org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the 
implementation for
+   * catalogType doesn't exist in XTable.
+   */
+  String catalogSyncClientImpl;
+
+  /**
+   * (Optional) A fully qualified class name that implements the interface for 
{@link
+   * org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used 
if the implementation
+   * for catalogType doesn't exist in XTable.
+   */
+  String catalogConversionSourceImpl;
+
+  /**
+   * The properties for this catalog, used for providing any custom behaviour 
during catalog sync
+   */
+  @NonNull @Builder.Default Map<String, String> catalogProperties = 
Collections.emptyMap();
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
similarity index 64%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
index d5d7a3c5..d6687523 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java
@@ -16,21 +16,24 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
-
-import java.util.Collections;
-import java.util.Map;
+package org.apache.xtable.conversion;
 
 import lombok.Builder;
 import lombok.NonNull;
 import lombok.Value;
 
-import org.apache.xtable.conversion.CatalogConfig;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 
+/**
+ * TargetCatalogConfig contains the parameters that are required when syncing 
{@link TargetTable} to
+ * a catalog.
+ */
 @Value
 @Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
-  @NonNull String catalogName;
-  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
+public class TargetCatalogConfig {
+  /** The tableIdentifier that will be used when syncing {@link TargetTable} 
to the catalog. */
+  @NonNull CatalogTableIdentifier catalogTableIdentifier;
+
+  /** Configuration for the catalog. */
+  @NonNull ExternalCatalogConfig catalogConfig;
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
similarity index 65%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
index d5d7a3c5..e2b028dc 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
@@ -16,21 +16,13 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
+package org.apache.xtable.model.storage;
 
-import java.util.Collections;
-import java.util.Map;
-
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
-
-import org.apache.xtable.conversion.CatalogConfig;
-
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
-  @NonNull String catalogName;
-  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
+/**
+ * Default constants for supported catalog types.
+ *
+ * @since 0.1
+ */
+public class CatalogType {
+  public static final String STORAGE = "STORAGE";
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
index 616f3d45..1525e6fa 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
@@ -29,4 +29,7 @@ import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 public interface CatalogConversionSource {
   /** Returns the source table object present in the catalog. */
   SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier);
+
+  /** Returns the {@link org.apache.xtable.model.storage.CatalogType} for the 
catalog conversion */
+  String getCatalogType();
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
index 62de9379..cc322854 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
@@ -34,6 +34,9 @@ public interface CatalogSyncClient<TABLE> extends 
AutoCloseable {
    */
   String getCatalogId();
 
+  /** Returns the {@link org.apache.xtable.model.storage.CatalogType} the 
client syncs to */
+  String getCatalogType();
+
   /** Returns the storage location of the table synced to the catalog. */
   String getStorageLocation(TABLE table);
 
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index f277495e..80de2299 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -174,4 +174,24 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                        <phase>test-compile</phase>
+                    </execution>
+                </executions>
+                <configuration>
+                    <skip>false</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
new file mode 100644
index 00000000..add95c21
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.catalog;
+
+import java.util.ServiceLoader;
+import java.util.function.Function;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.reflection.ReflectionUtils;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogConversionFactory {
+  private static final CatalogConversionFactory INSTANCE = new 
CatalogConversionFactory();
+
+  public static CatalogConversionFactory getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Returns an implementation class for {@link CatalogConversionSource} 
that's used for converting
+   * table definitions in the catalog to {@link 
org.apache.xtable.conversion.SourceTable} object.
+   *
+   * @param sourceCatalogConfig configuration for the source catalog
+   * @param configuration hadoop configuration
+   */
+  public static CatalogConversionSource createCatalogConversionSource(
+      ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {
+    if (!StringUtils.isEmpty(sourceCatalogConfig.getCatalogType())) {
+      return findInstance(
+          CatalogConversionSource.class,
+          sourceCatalogConfig.getCatalogType(),
+          CatalogConversionSource::getCatalogType);
+    }
+    return ReflectionUtils.createInstanceOfClass(
+        sourceCatalogConfig.getCatalogConversionSourceImpl(), 
sourceCatalogConfig, configuration);
+  }
+
+  /**
+   * Returns an implementation class for {@link CatalogSyncClient} that's used 
for syncing {@link
+   * org.apache.xtable.conversion.TargetTable} to a catalog.
+   *
+   * @param targetCatalogConfig configuration for the target catalog
+   * @param configuration hadoop configuration
+   */
+  public <TABLE> CatalogSyncClient<TABLE> createCatalogSyncClient(
+      ExternalCatalogConfig targetCatalogConfig, String tableFormat, 
Configuration configuration) {
+    if (!StringUtils.isEmpty(targetCatalogConfig.getCatalogType())) {
+      return findInstance(
+          CatalogSyncClient.class,
+          targetCatalogConfig.getCatalogType(),
+          CatalogSyncClient::getCatalogType);
+    }
+    return ReflectionUtils.createInstanceOfClass(
+        targetCatalogConfig.getCatalogSyncClientImpl(),
+        targetCatalogConfig,
+        tableFormat,
+        configuration);
+  }
+
+  private static <T> T findInstance(
+      Class<T> serviceClass, String catalogType, Function<T, String> 
catalogTypeExtractor) {
+    ServiceLoader<T> loader = ServiceLoader.load(serviceClass);
+    for (T instance : loader) {
+      String instanceCatalogType = catalogTypeExtractor.apply(instance);
+      if (catalogType.equals(instanceCatalogType)) {
+        return instance;
+      }
+    }
+    throw new NotSupportedException("catalogType is not yet supported: " + 
catalogType);
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index 222652a6..1db145ee 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -18,6 +18,8 @@
  
 package org.apache.xtable.conversion;
 
+import static 
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Collection;
@@ -37,15 +39,19 @@ import lombok.extern.log4j.Log4j2;
 
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.xtable.catalog.CatalogConversionFactory;
 import org.apache.xtable.exception.ReadException;
 import org.apache.xtable.model.IncrementalTableChanges;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
 import org.apache.xtable.spi.extractor.ConversionSource;
 import org.apache.xtable.spi.extractor.ExtractFromSource;
+import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
 import org.apache.xtable.spi.sync.ConversionTarget;
 import org.apache.xtable.spi.sync.TableFormatSync;
 
@@ -64,10 +70,17 @@ import org.apache.xtable.spi.sync.TableFormatSync;
 public class ConversionController {
   private final Configuration conf;
   private final ConversionTargetFactory conversionTargetFactory;
+  private final CatalogConversionFactory catalogConversionFactory;
   private final TableFormatSync tableFormatSync;
+  private final CatalogSync catalogSync;
 
   public ConversionController(Configuration conf) {
-    this(conf, ConversionTargetFactory.getInstance(), 
TableFormatSync.getInstance());
+    this(
+        conf,
+        ConversionTargetFactory.getInstance(),
+        CatalogConversionFactory.getInstance(),
+        TableFormatSync.getInstance(),
+        CatalogSync.getInstance());
   }
 
   /**
@@ -89,57 +102,146 @@ public class ConversionController {
     try (ConversionSource<COMMIT> conversionSource =
         
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
+      return syncTableFormats(config, source, config.getSyncMode());
+    } catch (IOException ioException) {
+      throw new ReadException("Failed to close source converter", ioException);
+    }
+  }
 
-      Map<String, ConversionTarget> conversionTargetByFormat =
-          config.getTargetTables().stream()
-              .collect(
-                  Collectors.toMap(
-                      TargetTable::getFormatName,
-                      targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
-      // State for each TableFormat
-      Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
-          conversionTargetByFormat.entrySet().stream()
-              .collect(
-                  Collectors.toMap(
-                      Map.Entry::getKey, entry -> 
entry.getValue().getTableMetadata()));
-      Map<String, ConversionTarget> formatsToSyncIncrementally =
-          getFormatsToSyncIncrementally(
-              config,
-              conversionTargetByFormat,
-              lastSyncMetadataByFormat,
-              source.getConversionSource());
-      Map<String, ConversionTarget> formatsToSyncBySnapshot =
-          conversionTargetByFormat.entrySet().stream()
-              .filter(entry -> 
!formatsToSyncIncrementally.containsKey(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-      SyncResultForTableFormats syncResultForSnapshotSync =
-          formatsToSyncBySnapshot.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncSnapshot(formatsToSyncBySnapshot, source);
-      SyncResultForTableFormats syncResultForIncrementalSync =
-          formatsToSyncIncrementally.isEmpty()
-              ? SyncResultForTableFormats.builder().build()
-              : syncIncrementalChanges(
-                  formatsToSyncIncrementally, lastSyncMetadataByFormat, 
source);
-      Map<String, SyncResult> syncResultsMerged =
-          new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
-      syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
-      String successfulSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.SUCCESS);
-      if (!successfulSyncs.isEmpty()) {
-        log.info("Sync is successful for the following formats {}", 
successfulSyncs);
-      }
-      String failedSyncs =
-          getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.ERROR);
-      if (!failedSyncs.isEmpty()) {
-        log.error("Sync failed for the following formats {}", failedSyncs);
+  /**
+   * Synchronizes the source table in conversion config to multiple target 
catalogs. If the
+   * configuration for the target table uses a different table format, 
synchronizes the table format
+   * first before syncing it to target catalog
+   *
+   * @param config A per table level config containing source table, target 
tables, target catalogs
+   *     and syncMode.
+   * @param conversionSourceProvider A provider for the {@link 
ConversionSource} instance for each
+   *     tableFormat, {@link ConversionSourceProvider#init(Configuration)} 
must be called before
+   *     calling this method.
+   * @return Returns a map containing the table format, and it's sync result. 
Run sync for a table *
+   *     with the provided per table level configuration.
+   */
+  public Map<String, SyncResult> syncTableAcrossCatalogs(
+      ConversionConfig config, Map<String, ConversionSourceProvider> 
conversionSourceProvider) {
+    if (config.getTargetTables() == null || 
config.getTargetTables().isEmpty()) {
+      throw new IllegalArgumentException("Please provide at-least one format 
to sync");
+    }
+    try (ConversionSource conversionSource =
+        conversionSourceProvider
+            .get(config.getSourceTable().getFormatName())
+            .getConversionSourceInstance(config.getSourceTable())) {
+      ExtractFromSource source = ExtractFromSource.of(conversionSource);
+      Map<String, SyncResult> tableFormatSyncResults =
+          syncTableFormats(config, source, config.getSyncMode());
+      Map<String, SyncResult> catalogSyncResults = new HashMap<>();
+      for (TargetTable targetTable : config.getTargetTables()) {
+        Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
+            config.getTargetCatalogs().get(targetTable).stream()
+                .collect(
+                    Collectors.toMap(
+                        TargetCatalogConfig::getCatalogTableIdentifier,
+                        targetCatalog ->
+                            catalogConversionFactory.createCatalogSyncClient(
+                                targetCatalog.getCatalogConfig(),
+                                targetTable.getFormatName(),
+                                conf)));
+        catalogSyncResults.put(
+            targetTable.getFormatName(),
+            syncCatalogsForTargetTable(
+                targetTable,
+                catalogSyncClients,
+                conversionSourceProvider.get(targetTable.getFormatName())));
       }
-      return syncResultsMerged;
+      mergeSyncResults(tableFormatSyncResults, catalogSyncResults);
+      return tableFormatSyncResults;
     } catch (IOException ioException) {
       throw new ReadException("Failed to close source converter", ioException);
     }
   }
 
+  /**
+   * Synchronizes the given source table format metadata in ConversionConfig 
to multiple target
+   * formats.
+   *
+   * @param config A per table level config containing tableBasePath, 
partitionFieldSpecConfig,
+   *     targetTableFormats and syncMode.
+   * @param source An extractor class for {@link ConversionSource} and allows 
fetching current
+   *     snapshot or incremental table changes.
+   * @param syncMode sync mode is either FULL or INCREMENTAL.
+   * @return Returns a map containing the table format, and it's sync result.
+   */
+  private <COMMIT> Map<String, SyncResult> syncTableFormats(
+      ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode 
syncMode) {
+    Map<String, ConversionTarget> conversionTargetByFormat =
+        config.getTargetTables().stream()
+            .filter(
+                targetTable ->
+                    
!targetTable.getFormatName().equals(config.getSourceTable().getFormatName()))
+            .collect(
+                Collectors.toMap(
+                    TargetTable::getFormatName,
+                    targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
+
+    Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
+        conversionTargetByFormat.entrySet().stream()
+            .collect(
+                Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().getTableMetadata()));
+    Map<String, ConversionTarget> formatsToSyncIncrementally =
+        getFormatsToSyncIncrementally(
+            syncMode,
+            conversionTargetByFormat,
+            lastSyncMetadataByFormat,
+            source.getConversionSource());
+    Map<String, ConversionTarget> formatsToSyncBySnapshot =
+        conversionTargetByFormat.entrySet().stream()
+            .filter(entry -> 
!formatsToSyncIncrementally.containsKey(entry.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    SyncResultForTableFormats syncResultForSnapshotSync =
+        formatsToSyncBySnapshot.isEmpty()
+            ? SyncResultForTableFormats.builder().build()
+            : syncSnapshot(formatsToSyncBySnapshot, source);
+    SyncResultForTableFormats syncResultForIncrementalSync =
+        formatsToSyncIncrementally.isEmpty()
+            ? SyncResultForTableFormats.builder().build()
+            : syncIncrementalChanges(formatsToSyncIncrementally, 
lastSyncMetadataByFormat, source);
+    Map<String, SyncResult> syncResultsMerged =
+        new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
+    syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
+    String successfulSyncs =
+        getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.SUCCESS);
+    if (!successfulSyncs.isEmpty()) {
+      log.info("Sync is successful for the following formats {}", 
successfulSyncs);
+    }
+    String failedSyncs =
+        getFormatsWithStatusCode(syncResultsMerged, 
SyncResult.SyncStatusCode.ERROR);
+    if (!failedSyncs.isEmpty()) {
+      log.error("Sync failed for the following formats {}", failedSyncs);
+    }
+    return syncResultsMerged;
+  }
+
+  /**
+   * Synchronizes the target table to multiple target catalogs.
+   *
+   * @param targetTable target table that needs to synced.
+   * @param catalogSyncClients Collection of catalog sync clients along with 
their table identifiers
+   *     for each target catalog.
+   * @param conversionSourceProvider A provider for the {@link 
ConversionSource} instance for the
+   *     table format of targetTable.
+   */
+  private SyncResult syncCatalogsForTargetTable(
+      TargetTable targetTable,
+      Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients,
+      ConversionSourceProvider conversionSourceProvider) {
+    return catalogSync.syncTable(
+        catalogSyncClients,
+        // We get the latest state of InternalTable for TargetTable
+        // and then synchronize it to catalogSyncClients.
+        conversionSourceProvider
+            .getConversionSourceInstance(convertToSourceTable(targetTable))
+            .getCurrentTable());
+  }
+
   private static String getFormatsWithStatusCode(
       Map<String, SyncResult> syncResultsMerged, SyncResult.SyncStatusCode 
statusCode) {
     return syncResultsMerged.entrySet().stream()
@@ -149,11 +251,11 @@ public class ConversionController {
   }
 
   private <COMMIT> Map<String, ConversionTarget> getFormatsToSyncIncrementally(
-      ConversionConfig conversionConfig,
+      SyncMode syncMode,
       Map<String, ConversionTarget> conversionTargetByFormat,
       Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat,
       ConversionSource<COMMIT> conversionSource) {
-    if (conversionConfig.getSyncMode() == SyncMode.FULL) {
+    if (syncMode == SyncMode.FULL) {
       // Full sync requested by config, hence no incremental sync.
       return Collections.emptyMap();
     }
@@ -268,6 +370,22 @@ public class ConversionController {
         .build();
   }
 
+  private void mergeSyncResults(
+      Map<String, SyncResult> syncResultsMerged, Map<String, SyncResult> 
catalogSyncResults) {
+    catalogSyncResults.forEach(
+        (tableFormat, catalogSyncResult) -> {
+          syncResultsMerged.computeIfPresent(
+              tableFormat,
+              (k, syncResult) ->
+                  syncResult.toBuilder()
+                      .syncDuration(
+                          
syncResult.getSyncDuration().plus(catalogSyncResult.getSyncDuration()))
+                      
.catalogSyncStatusList(catalogSyncResult.getCatalogSyncStatusList())
+                      .build());
+          syncResultsMerged.computeIfAbsent(tableFormat, k -> 
catalogSyncResult);
+        });
+  }
+
   @Value
   @Builder
   private static class SyncResultForTableFormats {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
similarity index 65%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
index d5d7a3c5..f21be670 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
+package org.apache.xtable.conversion;
 
-import java.util.Collections;
-import java.util.Map;
+public class ConversionUtils {
 
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
-
-import org.apache.xtable.conversion.CatalogConfig;
-
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
-  @NonNull String catalogName;
-  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
+  public static SourceTable convertToSourceTable(TargetTable table) {
+    return new SourceTable(
+        table.getName(),
+        table.getFormatName(),
+        table.getBasePath(),
+        table.getBasePath(),
+        table.getNamespace(),
+        table.getCatalogConfig(),
+        table.getAdditionalProperties());
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
index d5d7a3c5..e0ec7762 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
@@ -27,10 +27,16 @@ import lombok.Value;
 
 import org.apache.xtable.conversion.CatalogConfig;
 
+/**
+ * Iceberg requires a catalog to perform any operation, if no catalog is 
provided the default
+ * catalog (HadoopCatalog or storage based catalog) is used. For syncing 
iceberg to multiple
+ * catalogs, you can use {@link 
org.apache.xtable.conversion.ExternalCatalogConfig} instead which
+ * allows syncing the latest version of iceberg metadata to multiple catalogs.
+ */
 @Value
 @Builder
 public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
   @NonNull String catalogName;
+  @NonNull String catalogImpl;
   @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
 
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
new file mode 100644
index 00000000..1d05666b
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.TargetCatalogConfig;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+import org.apache.xtable.testutil.ITTestUtils;
+import org.apache.xtable.testutil.ITTestUtils.TestCatalogConversionSourceImpl;
+import org.apache.xtable.testutil.ITTestUtils.TestCatalogSyncImpl;
+
+class TestCatalogConversionFactory {
+
+  @Test
+  void createCatalogConversionSource() {
+    ExternalCatalogConfig sourceCatalog =
+        ExternalCatalogConfig.builder()
+            .catalogId("catalogId")
+            
.catalogConversionSourceImpl(TestCatalogConversionSourceImpl.class.getName())
+            .catalogProperties(Collections.emptyMap())
+            .build();
+    CatalogConversionSource catalogConversionSource =
+        CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, 
new Configuration());
+    assertEquals(
+        catalogConversionSource.getClass().getName(),
+        TestCatalogConversionSourceImpl.class.getName());
+  }
+
+  @Test
+  void createCatalogConversionSourceForCatalogType() {
+    ExternalCatalogConfig sourceCatalog =
+        ExternalCatalogConfig.builder()
+            .catalogId("catalogId")
+            .catalogType(ITTestUtils.TEST_CATALOG_TYPE)
+            .catalogProperties(Collections.emptyMap())
+            .build();
+    CatalogConversionSource catalogConversionSource =
+        CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, 
new Configuration());
+    assertEquals(
+        catalogConversionSource.getClass().getName(),
+        TestCatalogConversionSourceImpl.class.getName());
+  }
+
+  @Test
+  void createCatalogSyncClient() {
+    TargetCatalogConfig targetCatalogConfig =
+        TargetCatalogConfig.builder()
+            .catalogConfig(
+                ExternalCatalogConfig.builder()
+                    .catalogId("catalogId")
+                    .catalogSyncClientImpl(TestCatalogSyncImpl.class.getName())
+                    .catalogProperties(Collections.emptyMap())
+                    .build())
+            .catalogTableIdentifier(
+                new ThreePartHierarchicalTableIdentifier("target-database", 
"target-tableName"))
+            .build();
+    CatalogSyncClient catalogSyncClient =
+        CatalogConversionFactory.getInstance()
+            .createCatalogSyncClient(
+                targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new 
Configuration());
+    assertEquals(catalogSyncClient.getClass().getName(), 
TestCatalogSyncImpl.class.getName());
+  }
+
+  @Test
+  void createCatalogSyncClientForCatalogType() {
+    TargetCatalogConfig targetCatalogConfig =
+        TargetCatalogConfig.builder()
+            .catalogConfig(
+                ExternalCatalogConfig.builder()
+                    .catalogId("catalogId")
+                    .catalogType(ITTestUtils.TEST_CATALOG_TYPE)
+                    .catalogProperties(Collections.emptyMap())
+                    .build())
+            .catalogTableIdentifier(
+                new ThreePartHierarchicalTableIdentifier("target-database", 
"target-tableName"))
+            .build();
+    CatalogSyncClient catalogSyncClient =
+        CatalogConversionFactory.getInstance()
+            .createCatalogSyncClient(
+                targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new 
Configuration());
+    assertEquals(catalogSyncClient.getClass().getName(), 
TestCatalogSyncImpl.class.getName());
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index caba8046..0f34103e 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -18,8 +18,12 @@
  
 package org.apache.xtable.conversion;
 
+import static 
org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
+import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
+import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
 import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -36,6 +40,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -43,17 +48,23 @@ import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatcher;
 
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.xtable.catalog.CatalogConversionFactory;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.IncrementalTableChanges;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
 import org.apache.xtable.spi.extractor.ConversionSource;
+import org.apache.xtable.spi.sync.CatalogSync;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
 import org.apache.xtable.spi.sync.ConversionTarget;
 import org.apache.xtable.spi.sync.TableFormatSync;
 
@@ -62,12 +73,22 @@ public class TestConversionController {
   private final Configuration mockConf = mock(Configuration.class);
   private final ConversionSourceProvider<Instant> mockConversionSourceProvider 
=
       mock(ConversionSourceProvider.class);
+  private final ConversionSourceProvider<Instant> 
mockConversionSourceProvider2 =
+      mock(ConversionSourceProvider.class);
+  private final ConversionSourceProvider<Instant> 
mockConversionSourceProvider3 =
+      mock(ConversionSourceProvider.class);
+
   private final ConversionSource<Instant> mockConversionSource = 
mock(ConversionSource.class);
   private final ConversionTargetFactory mockConversionTargetFactory =
       mock(ConversionTargetFactory.class);
+  private final CatalogConversionFactory mockCatalogConversionFactory =
+      mock(CatalogConversionFactory.class);
   private final TableFormatSync tableFormatSync = mock(TableFormatSync.class);
+  private final CatalogSync catalogSync = mock(CatalogSync.class);
   private final ConversionTarget mockConversionTarget1 = 
mock(ConversionTarget.class);
   private final ConversionTarget mockConversionTarget2 = 
mock(ConversionTarget.class);
+  private final CatalogSyncClient mockCatalogSyncClient1 = 
mock(CatalogSyncClient.class);
+  private final CatalogSyncClient mockCatalogSyncClient2 = 
mock(CatalogSyncClient.class);
 
   @Test
   void testAllSnapshotSyncAsPerConfig() {
@@ -96,7 +117,12 @@ public class TestConversionController {
             eq(internalSnapshot)))
         .thenReturn(perTableResults);
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(perTableResults, result);
@@ -182,7 +208,12 @@ public class TestConversionController {
     expectedSyncResult.put(TableFormat.ICEBERG, 
getLastSyncResult(icebergSyncResults));
     expectedSyncResult.put(TableFormat.DELTA, 
getLastSyncResult(deltaSyncResults));
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
@@ -226,7 +257,12 @@ public class TestConversionController {
             eq(internalSnapshot)))
         .thenReturn(syncResults);
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(syncResults, result);
@@ -310,7 +346,12 @@ public class TestConversionController {
     expectedSyncResult.put(TableFormat.ICEBERG, syncResult);
     expectedSyncResult.put(TableFormat.DELTA, 
getLastSyncResult(deltaSyncResults));
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
@@ -368,16 +409,104 @@ public class TestConversionController {
     // Iceberg and Delta have no commits to sync
     Map<String, SyncResult> expectedSyncResult = Collections.emptyMap();
     ConversionController conversionController =
-        new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
     Map<String, SyncResult> result =
         conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
   }
 
+  @Test
+  void testNoTableFormatConversionWithMultipleCatalogSync() {
+    SyncMode syncMode = SyncMode.INCREMENTAL;
+    List<TargetCatalogConfig> targetCatalogs =
+        Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2"));
+    InternalTable internalTable = getInternalTable();
+    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+    // Conversion source and target mocks.
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(
+            Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode, 
targetCatalogs);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
+        .thenReturn(mockConversionSource);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            convertToSourceTable(conversionConfig.getTargetTables().get(0))))
+        .thenReturn(mockConversionSource);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            convertToSourceTable(conversionConfig.getTargetTables().get(1))))
+        .thenReturn(mockConversionSource);
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
+        .thenReturn(mockConversionTarget1);
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
+        .thenReturn(mockConversionTarget2);
+    
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
+    
when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable());
+    // Mocks for tableFormatSync.
+    Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
+    Instant syncStartTime = Instant.now();
+    SyncResult syncResult =
+        buildSyncResult(syncMode, instantBeforeHour, syncStartTime, 
Duration.ofSeconds(1));
+    Map<String, SyncResult> tableFormatSyncResults =
+        buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult);
+    when(tableFormatSync.syncSnapshot(
+            argThat(containsAll(Arrays.asList(mockConversionTarget1, 
mockConversionTarget2))),
+            eq(internalSnapshot)))
+        .thenReturn(tableFormatSyncResults);
+    // Mocks for catalogSync.
+    when(mockCatalogConversionFactory.createCatalogSyncClient(
+            eq(targetCatalogs.get(0).getCatalogConfig()), any(), eq(mockConf)))
+        .thenReturn(mockCatalogSyncClient1);
+    when(mockCatalogConversionFactory.createCatalogSyncClient(
+            eq(targetCatalogs.get(1).getCatalogConfig()), any(), eq(mockConf)))
+        .thenReturn(mockCatalogSyncClient2);
+    when(catalogSync.syncTable(
+            eq(
+                ImmutableMap.of(
+                    targetCatalogs.get(0).getCatalogTableIdentifier(), 
mockCatalogSyncClient1,
+                    targetCatalogs.get(1).getCatalogTableIdentifier(), 
mockCatalogSyncClient2)),
+            any()))
+        .thenReturn(
+            buildSyncResult(syncMode, syncStartTime, instantBeforeHour, 
Duration.ofSeconds(3)));
+    ConversionController conversionController =
+        new ConversionController(
+            mockConf,
+            mockConversionTargetFactory,
+            mockCatalogConversionFactory,
+            tableFormatSync,
+            catalogSync);
+    // Mocks for conversionSourceProviders.
+    Map<String, ConversionSourceProvider> conversionSourceProviders = new 
HashMap<>();
+    conversionSourceProviders.put(HUDI, mockConversionSourceProvider);
+    conversionSourceProviders.put(ICEBERG, mockConversionSourceProvider);
+    conversionSourceProviders.put(DELTA, mockConversionSourceProvider);
+    // Assert results.
+    Map<String, SyncResult> mergedSyncResults =
+        buildPerTableResult(
+            Arrays.asList(ICEBERG, DELTA),
+            
syncResult.toBuilder().syncDuration(Duration.ofSeconds(4)).build());
+    Map<String, SyncResult> result =
+        conversionController.syncTableAcrossCatalogs(conversionConfig, 
conversionSourceProviders);
+    assertEquals(mergedSyncResults, result);
+  }
+
   private SyncResult getLastSyncResult(List<SyncResult> syncResults) {
     return syncResults.get(syncResults.size() - 1);
   }
 
+  private Map<String, SyncResult> buildPerTableResult(
+      List<String> tableFormats, SyncResult syncResult) {
+    Map<String, SyncResult> perTableResults = new HashMap<>();
+    tableFormats.forEach(tableFormat -> perTableResults.put(tableFormat, 
syncResult));
+    return perTableResults;
+  }
+
   private List<SyncResult> buildSyncResults(List<Instant> instantList) {
     return instantList.stream()
         .map(instant -> buildSyncResult(SyncMode.INCREMENTAL, instant))
@@ -396,6 +525,17 @@ public class TestConversionController {
         .build();
   }
 
+  private SyncResult buildSyncResult(
+      SyncMode syncMode, Instant syncStartTime, Instant lastSyncedInstant, 
Duration duration) {
+    return SyncResult.builder()
+        .mode(syncMode)
+        .lastInstantSynced(lastSyncedInstant)
+        .syncStartTime(syncStartTime)
+        .syncDuration(duration)
+        .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
+        .build();
+  }
+
   private InternalSnapshot buildSnapshot(InternalTable internalTable, String 
version) {
     return 
InternalSnapshot.builder().table(internalTable).version(version).build();
   }
@@ -413,6 +553,13 @@ public class TestConversionController {
   }
 
   private ConversionConfig getTableSyncConfig(List<String> targetTableFormats, 
SyncMode syncMode) {
+    return getTableSyncConfig(targetTableFormats, syncMode, 
Collections.emptyList());
+  }
+
+  private ConversionConfig getTableSyncConfig(
+      List<String> targetTableFormats,
+      SyncMode syncMode,
+      List<TargetCatalogConfig> targetCatalogs) {
     SourceTable sourceTable =
         SourceTable.builder()
             .name("tablename")
@@ -434,10 +581,27 @@ public class TestConversionController {
     return ConversionConfig.builder()
         .sourceTable(sourceTable)
         .targetTables(targetTables)
+        .targetCatalogs(
+            targetTables.stream()
+                .collect(Collectors.toMap(Function.identity(), k -> 
targetCatalogs)))
         .syncMode(syncMode)
         .build();
   }
 
+  private TargetCatalogConfig getTargetCatalog(String suffix) {
+    return TargetCatalogConfig.builder()
+        .catalogConfig(
+            ExternalCatalogConfig.builder()
+                .catalogId("catalogId-" + suffix)
+                .catalogSyncClientImpl("catalogImpl-" + suffix)
+                .catalogProperties(Collections.emptyMap())
+                .build())
+        .catalogTableIdentifier(
+            new ThreePartHierarchicalTableIdentifier(
+                "target-database" + suffix, "target-tableName" + suffix))
+        .build();
+  }
+
   private static <T> ArgumentMatcher<Collection<T>> containsAll(Collection<T> 
expected) {
     return actual -> actual.size() == expected.size() && 
actual.containsAll(expected);
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java 
b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
index 281e61fe..ce374f39 100644
--- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
+++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
@@ -18,16 +18,25 @@
  
 package org.apache.xtable.testutil;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Assertions;
 
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
 
 public class ITTestUtils {
+  public static final String TEST_CATALOG_TYPE = "test";
 
   public static void validateTable(
       InternalTable internalTable,
@@ -44,4 +53,103 @@ public class ITTestUtils {
     Assertions.assertEquals(basePath, internalTable.getBasePath());
     Assertions.assertEquals(partitioningFields, 
internalTable.getPartitioningFields());
   }
+
+  public static class TestCatalogSyncImpl implements CatalogSyncClient {
+    private static final Map<String, Integer> FUNCTION_CALLS = new HashMap<>();
+
+    public TestCatalogSyncImpl(
+        ExternalCatalogConfig catalogConfig, String tableFormat, Configuration 
hadoopConf) {}
+
+    public TestCatalogSyncImpl() {}
+
+    @Override
+    public String getCatalogId() {
+      trackFunctionCall();
+      return null;
+    }
+
+    @Override
+    public String getCatalogType() {
+      return TEST_CATALOG_TYPE;
+    }
+
+    @Override
+    public String getStorageLocation(Object o) {
+      trackFunctionCall();
+      return null;
+    }
+
+    @Override
+    public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) {
+      trackFunctionCall();
+      return false;
+    }
+
+    @Override
+    public void createDatabase(CatalogTableIdentifier tableIdentifier) {
+      trackFunctionCall();
+    }
+
+    @Override
+    public Object getTable(CatalogTableIdentifier tableIdentifier) {
+      trackFunctionCall();
+      return null;
+    }
+
+    @Override
+    public void createTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {
+      trackFunctionCall();
+    }
+
+    @Override
+    public void refreshTable(
+        InternalTable table, Object catalogTable, CatalogTableIdentifier 
tableIdentifier) {
+      trackFunctionCall();
+    }
+
+    @Override
+    public void createOrReplaceTable(InternalTable table, 
CatalogTableIdentifier tableIdentifier) {
+      trackFunctionCall();
+    }
+
+    @Override
+    public void dropTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {
+      trackFunctionCall();
+    }
+
+    @Override
+    public void close() throws Exception {
+      trackFunctionCall();
+    }
+
+    private void trackFunctionCall() {
+      String methodName = 
Thread.currentThread().getStackTrace()[2].getMethodName();
+      FUNCTION_CALLS.put(methodName, FUNCTION_CALLS.getOrDefault(methodName, 
0) + 1);
+    }
+
+    public static Map<String, Integer> getFunctionCalls() {
+      return FUNCTION_CALLS;
+    }
+  }
+
+  public static class TestCatalogConversionSourceImpl implements 
CatalogConversionSource {
+    public TestCatalogConversionSourceImpl(
+        ExternalCatalogConfig sourceCatalogConfig, Configuration 
configuration) {}
+
+    public TestCatalogConversionSourceImpl() {}
+
+    @Override
+    public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) {
+      return SourceTable.builder()
+          .name("source_table_name")
+          .basePath("file://base_path/v1/")
+          .formatName("ICEBERG")
+          .build();
+    }
+
+    @Override
+    public String getCatalogType() {
+      return TEST_CATALOG_TYPE;
+    }
+  }
 }
diff --git 
a/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
 
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
new file mode 100644
index 00000000..ceb8a2ff
--- /dev/null
+++ 
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
@@ -0,0 +1,18 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl
\ No newline at end of file
diff --git 
a/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
 
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
new file mode 100644
index 00000000..4e571e3e
--- /dev/null
+++ 
b/xtable-core/src/test/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
@@ -0,0 +1,18 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+org.apache.xtable.testutil.ITTestUtils$TestCatalogSyncImpl
\ No newline at end of file
diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml
index bc91f99e..979cb456 100644
--- a/xtable-utilities/pom.xml
+++ b/xtable-utilities/pom.xml
@@ -35,6 +35,15 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.xtable</groupId>
+            <artifactId>xtable-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <!-- command line arg parsing -->
         <dependency>
             <groupId>commons-cli</groupId>
@@ -125,6 +134,20 @@
             <artifactId>junit-jupiter-engine</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.xtable</groupId>
+            <artifactId>xtable-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            
<artifactId>hudi-spark${spark.version.prefix}-bundle_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
new file mode 100644
index 00000000..60a62cd9
--- /dev/null
+++ 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.utilities;
+
+import static org.apache.xtable.utilities.RunSync.getCustomConfigurations;
+import static org.apache.xtable.utilities.RunSync.loadHadoopConf;
+import static 
org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Value;
+import lombok.extern.jackson.Jacksonized;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import org.apache.xtable.catalog.CatalogConversionFactory;
+import org.apache.xtable.conversion.ConversionConfig;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetCatalogConfig;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiSourceConfig;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.reflection.ReflectionUtils;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TableIdentifier;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier;
+import org.apache.xtable.utilities.RunSync.TableFormatConverters;
+
+/**
+ * Provides standalone process for reading tables from a source catalog and 
synchronizing their
+ * state in target tables, supports table format conversion as well if the 
target table chooses a
+ * different format from source table.
+ */
+@Log4j2
+public class RunCatalogSync {
+  public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new 
YAMLFactory());
+  private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = 
"catalogConfig";
+  private static final String HADOOP_CONFIG_PATH = "hadoopConfig";
+  private static final String CONVERTERS_CONFIG_PATH = "convertersConfig";
+  private static final String HELP_OPTION = "h";
+  private static final Map<String, ConversionSourceProvider> 
CONVERSION_SOURCE_PROVIDERS =
+      new HashMap<>();
+
+  private static final Options OPTIONS =
+      new Options()
+          .addRequiredOption(
+              CATALOG_SOURCE_AND_TARGET_CONFIG_PATH,
+              "catalogSyncConfig",
+              true,
+              "The path to a yaml file containing source and target tables 
catalog configurations along with the table identifiers that need to synced")
+          .addOption(
+              HADOOP_CONFIG_PATH,
+              "hadoopConfig",
+              true,
+              "Hadoop config xml file path containing configs necessary to 
access the "
+                  + "file system. These configs will override the default 
configs.")
+          .addOption(
+              CONVERTERS_CONFIG_PATH,
+              "convertersConfig",
+              true,
+              "The path to a yaml file containing InternalTable converter 
configurations. "
+                  + "These configs will override the default")
+          .addOption(HELP_OPTION, "help", false, "Displays help information to 
run this utility");
+
+  public static void main(String[] args) throws Exception {
+    CommandLineParser parser = new DefaultParser();
+    CommandLine cmd;
+    try {
+      cmd = parser.parse(OPTIONS, args);
+    } catch (ParseException e) {
+      new HelpFormatter().printHelp("xtable.jar", OPTIONS, true);
+      return;
+    }
+
+    if (cmd.hasOption(HELP_OPTION)) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("RunCatalogSync", OPTIONS);
+      return;
+    }
+
+    DatasetConfig datasetConfig;
+    try (InputStream inputStream =
+        Files.newInputStream(
+            
Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) {
+      datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
+    }
+
+    byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH);
+    Configuration hadoopConf = loadHadoopConf(customConfig);
+
+    customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
+    TableFormatConverters tableFormatConverters = 
loadTableFormatConversionConfigs(customConfig);
+
+    Map<String, ExternalCatalogConfig> catalogsById =
+        datasetConfig.getTargetCatalogs().stream()
+            .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, 
Function.identity()));
+    Optional<CatalogConversionSource> catalogConversionSource =
+        getCatalogConversionSource(datasetConfig.getSourceCatalog(), 
hadoopConf);
+    ConversionController conversionController = new 
ConversionController(hadoopConf);
+    for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) {
+      SourceTable sourceTable =
+          getSourceTable(dataset.getSourceCatalogTableIdentifier(), 
catalogConversionSource);
+      List<TargetTable> targetTables = new ArrayList<>();
+      Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs = new 
HashMap<>();
+      for (TargetTableIdentifier targetCatalogTableIdentifier :
+          dataset.getTargetCatalogTableIdentifiers()) {
+        TargetTable targetTable =
+            TargetTable.builder()
+                .name(sourceTable.getName())
+                .basePath(sourceTable.getBasePath())
+                .namespace(sourceTable.getNamespace())
+                .formatName(targetCatalogTableIdentifier.getTableFormat())
+                .build();
+        targetTables.add(targetTable);
+        if (!targetCatalogs.containsKey(targetTable)) {
+          targetCatalogs.put(targetTable, new ArrayList<>());
+        }
+        targetCatalogs
+            .get(targetTable)
+            .add(
+                TargetCatalogConfig.builder()
+                    .catalogTableIdentifier(
+                        getCatalogTableIdentifier(
+                            targetCatalogTableIdentifier.getTableIdentifier()))
+                    
.catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId()))
+                    .build());
+      }
+      ConversionConfig conversionConfig =
+          ConversionConfig.builder()
+              .sourceTable(sourceTable)
+              .targetTables(targetTables)
+              .targetCatalogs(targetCatalogs)
+              .syncMode(SyncMode.INCREMENTAL)
+              .build();
+      List<String> tableFormats =
+          Stream.concat(
+                  Stream.of(sourceTable.getFormatName()),
+                  targetTables.stream().map(TargetTable::getFormatName))
+              .distinct()
+              .collect(Collectors.toList());
+      try {
+        conversionController.syncTableAcrossCatalogs(
+            conversionConfig,
+            getConversionSourceProviders(tableFormats, tableFormatConverters, 
hadoopConf));
+      } catch (Exception e) {
+        log.error("Error running sync for {}", sourceTable.getBasePath(), e);
+      }
+    }
+  }
+
+  static Optional<CatalogConversionSource> getCatalogConversionSource(
+      ExternalCatalogConfig sourceCatalog, Configuration hadoopConf) {
+    if (CatalogType.STORAGE.equals(sourceCatalog.getCatalogType())) {
+      return Optional.empty();
+    }
+    return Optional.of(
+        CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, 
hadoopConf));
+  }
+
+  static SourceTable getSourceTable(
+      DatasetConfig.SourceTableIdentifier sourceTableIdentifier,
+      Optional<CatalogConversionSource> catalogConversionSource) {
+    SourceTable sourceTable = null;
+    if (sourceTableIdentifier.getStorageIdentifier() != null) {
+      StorageIdentifier storageIdentifier = 
sourceTableIdentifier.getStorageIdentifier();
+      Properties sourceProperties = new Properties();
+      if (storageIdentifier.getPartitionSpec() != null) {
+        sourceProperties.put(
+            HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, 
storageIdentifier.getPartitionSpec());
+      }
+      sourceTable =
+          SourceTable.builder()
+              .name(storageIdentifier.getTableName())
+              .basePath(storageIdentifier.getTableBasePath())
+              .namespace(
+                  storageIdentifier.getNamespace() == null
+                      ? null
+                      : storageIdentifier.getNamespace().split("\\."))
+              .dataPath(storageIdentifier.getTableDataPath())
+              .formatName(storageIdentifier.getTableFormat())
+              .additionalProperties(sourceProperties)
+              .build();
+    } else if (catalogConversionSource.isPresent()) {
+      sourceTable =
+          catalogConversionSource
+              .get()
+              .getSourceTable(
+                  
getCatalogTableIdentifier(sourceTableIdentifier.getTableIdentifier()));
+    }
+    return sourceTable;
+  }
+
+  static Map<String, ConversionSourceProvider> getConversionSourceProviders(
+      List<String> tableFormats,
+      TableFormatConverters tableFormatConverters,
+      Configuration hadoopConf) {
+    for (String tableFormat : tableFormats) {
+      if (CONVERSION_SOURCE_PROVIDERS.containsKey(tableFormat)) {
+        continue;
+      }
+      TableFormatConverters.ConversionConfig sourceConversionConfig =
+          tableFormatConverters.getTableFormatConverters().get(tableFormat);
+      if (sourceConversionConfig == null) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Source format %s is not supported. Known source and target 
formats are %s",
+                tableFormat, 
tableFormatConverters.getTableFormatConverters().keySet()));
+      }
+      String sourceProviderClass = 
sourceConversionConfig.conversionSourceProviderClass;
+      ConversionSourceProvider<?> conversionSourceProvider =
+          ReflectionUtils.createInstanceOfClass(sourceProviderClass);
+      conversionSourceProvider.init(hadoopConf);
+      CONVERSION_SOURCE_PROVIDERS.put(tableFormat, conversionSourceProvider);
+    }
+    return CONVERSION_SOURCE_PROVIDERS;
+  }
+
+  /**
+   * Returns an implementation class for {@link CatalogTableIdentifier} based 
on the tableIdentifier
+   * provided by user.
+   */
+  static CatalogTableIdentifier getCatalogTableIdentifier(TableIdentifier 
tableIdentifier) {
+    if (tableIdentifier.getHierarchicalId() != null) {
+      return ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+          tableIdentifier.getHierarchicalId());
+    }
+    throw new IllegalArgumentException("Invalid tableIdentifier configuration 
provided");
+  }
+
+  @Value
+  @Builder
+  @Jacksonized
+  public static class DatasetConfig {
+    /**
+     * Configuration of the source catalog from which XTable will read. It 
must contain all the
+     * necessary connection and access details for describing and listing 
tables
+     */
+    ExternalCatalogConfig sourceCatalog;
+    /**
+     * Defines configuration one or more target catalogs, to which XTable will 
write or update
+     * tables. Unlike the source, these catalogs must be writable
+     */
+    List<ExternalCatalogConfig> targetCatalogs;
+    /** A list of datasets that specify how a source table maps to one or more 
target tables. */
+    List<Dataset> datasets;
+
+    /** Configuration for catalog. */
+    ExternalCatalogConfig catalogConfig;
+
+    @Value
+    @Builder
+    @Jacksonized
+    public static class Dataset {
+      /** Identifies the source table in sourceCatalog. */
+      SourceTableIdentifier sourceCatalogTableIdentifier;
+      /** A list of one or more targets that this source table should be 
written to. */
+      List<TargetTableIdentifier> targetCatalogTableIdentifiers;
+    }
+
+    @Value
+    @Builder
+    @Jacksonized
+    public static class SourceTableIdentifier {
+      /** Specifies the table identifier in the source catalog. */
+      TableIdentifier tableIdentifier;
+      /**
+       * (Optional) Provides direct storage details such as a table’s base 
path (like an S3
+       * location) and the partition specification. This allows reading from a 
source even if it is
+       * not strictly registered in a catalog, as long as the format and 
location are known
+       */
+      StorageIdentifier storageIdentifier;
+    }
+
+    @Value
+    @Builder
+    @Jacksonized
+    public static class TargetTableIdentifier {
+      /**
+       * The user defined unique identifier of the target catalog where the 
table will be created or
+       * updated
+       */
+      String catalogId;
+      /**
+       * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how 
the data will be
+       * stored at the target.
+       */
+      String tableFormat;
+      /** Specifies the table identifier in the target catalog. */
+      TableIdentifier tableIdentifier;
+    }
+
+    @Value
+    @Builder
+    @Jacksonized
+    public static class TableIdentifier {
+      /**
+       * Specifics the three level hierarchical table identifier for {@link
+       * HierarchicalTableIdentifier}
+       */
+      String hierarchicalId;
+    }
+
+    /**
+     * Configuration in storage for table. This is an optional field in {@link
+     * SourceTableIdentifier}.
+     */
+    @Value
+    @Builder
+    @Jacksonized
+    public static class StorageIdentifier {
+      String tableFormat;
+      String tableBasePath;
+      String tableDataPath;
+      String tableName;
+      String partitionSpec;
+      String namespace;
+    }
+  }
+}
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index c84753de..1a7bda87 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -190,12 +190,12 @@ public class RunSync {
       try {
         conversionController.sync(conversionConfig, conversionSourceProvider);
       } catch (Exception e) {
-        log.error(String.format("Error running sync for %s", 
table.getTableBasePath()), e);
+        log.error("Error running sync for {}", table.getTableBasePath(), e);
       }
     }
   }
 
-  private static byte[] getCustomConfigurations(CommandLine cmd, String 
option) throws IOException {
+  static byte[] getCustomConfigurations(CommandLine cmd, String option) throws 
IOException {
     byte[] customConfig = null;
     if (cmd.hasOption(option)) {
       customConfig = Files.readAllBytes(Paths.get(cmd.getOptionValue(option)));
diff --git 
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
new file mode 100644
index 00000000..52cff85a
--- /dev/null
+++ 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.utilities;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import lombok.SneakyThrows;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestJavaHudiTable;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.testutil.ITTestUtils;
+import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.SourceTableIdentifier;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TableIdentifier;
+import 
org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier;
+
+public class ITRunCatalogSync {
+
+  private static final List<String> EXPECTED_FUNCTION_CALLS =
+      Arrays.asList(
+          "hasDatabase",
+          "createDatabase",
+          "getTable",
+          "getStorageLocation",
+          "createTable",
+          "getCatalogId");
+
+  @Test
+  void testCatalogSync(@TempDir Path tempDir) throws Exception {
+    String tableName = "test-table";
+    try (GenericTable table =
+        TestJavaHudiTable.forStandardSchema(
+            tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
+      table.insertRows(20);
+      File configFile = writeConfigFile(tempDir, table, tableName);
+      String[] args = new String[] {"-catalogConfig", configFile.getPath()};
+      RunCatalogSync.main(args);
+      validateTargetMetadataIsPresent(table.getBasePath());
+      Map<String, Integer> functionCalls = 
ITTestUtils.TestCatalogSyncImpl.getFunctionCalls();
+      EXPECTED_FUNCTION_CALLS.forEach(
+          (function -> Assertions.assertEquals(2, 
functionCalls.get(function))));
+    }
+  }
+
+  private static File writeConfigFile(Path tempDir, GenericTable table, String 
tableName)
+      throws IOException {
+    DatasetConfig config =
+        DatasetConfig.builder()
+            .sourceCatalog(
+                ExternalCatalogConfig.builder()
+                    .catalogId("source-catalog-1")
+                    .catalogType(CatalogType.STORAGE)
+                    .build())
+            .targetCatalogs(
+                Collections.singletonList(
+                    ExternalCatalogConfig.builder()
+                        .catalogId("target-catalog-1")
+                        
.catalogSyncClientImpl(ITTestUtils.TestCatalogSyncImpl.class.getName())
+                        .build()))
+            .datasets(
+                Collections.singletonList(
+                    DatasetConfig.Dataset.builder()
+                        .sourceCatalogTableIdentifier(
+                            SourceTableIdentifier.builder()
+                                .storageIdentifier(
+                                    StorageIdentifier.builder()
+                                        .tableBasePath(table.getBasePath())
+                                        .tableName(tableName)
+                                        .tableFormat("HUDI")
+                                        .build())
+                                .build())
+                        .targetCatalogTableIdentifiers(
+                            Arrays.asList(
+                                TargetTableIdentifier.builder()
+                                    .catalogId("target-catalog-1")
+                                    .tableFormat("DELTA")
+                                    .tableIdentifier(
+                                        TableIdentifier.builder()
+                                            
.hierarchicalId("database-1.table-1")
+                                            .build())
+                                    .build(),
+                                TargetTableIdentifier.builder()
+                                    .catalogId("target-catalog-1")
+                                    .tableFormat("ICEBERG")
+                                    .tableIdentifier(
+                                        TableIdentifier.builder()
+                                            
.hierarchicalId("catalog-2.database-2.table-2")
+                                            .build())
+                                    .build()))
+                        .build()))
+            .build();
+    File configFile = new File(tempDir + "config.yaml");
+    RunSync.YAML_MAPPER.writeValue(configFile, config);
+    return configFile;
+  }
+
+  @SneakyThrows
+  private void validateTargetMetadataIsPresent(String basePath) {
+    Path icebergMetadataPath = Paths.get(URI.create(basePath + "/metadata"));
+    long icebergMetadataFiles =
+        Files.list(icebergMetadataPath).filter(p -> 
p.toString().endsWith("metadata.json")).count();
+    Assertions.assertEquals(2, icebergMetadataFiles);
+    Path deltaMetadataPath = Paths.get(URI.create(basePath + "/_delta_log"));
+    long deltaMetadataFiles =
+        Files.list(deltaMetadataPath).filter(p -> 
p.toString().endsWith(".json")).count();
+    Assertions.assertEquals(1, deltaMetadataFiles);
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
similarity index 60%
copy from 
xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
copy to 
xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
index d5d7a3c5..3f504ff7 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java
+++ 
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.iceberg;
+package org.apache.xtable.utilities;
 
-import java.util.Collections;
-import java.util.Map;
+import static org.junit.jupiter.api.Assertions.*;
 
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
+import lombok.SneakyThrows;
 
-import org.apache.xtable.conversion.CatalogConfig;
+import org.junit.jupiter.api.Test;
 
-@Value
-@Builder
-public class IcebergCatalogConfig implements CatalogConfig {
-  @NonNull String catalogImpl;
-  @NonNull String catalogName;
-  @NonNull @Builder.Default Map<String, String> catalogOptions = 
Collections.emptyMap();
+class TestRunCatalogSync {
+
+  @SneakyThrows
+  @Test
+  void testMain() {
+    String catalogConfigYamlPath =
+        
TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath();
+    String[] args = {"-catalogConfig", catalogConfigYamlPath};
+    // Ensure yaml gets parsed without any errors.
+    assertDoesNotThrow(() -> RunCatalogSync.main(args));
+  }
 }
diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml 
b/xtable-utilities/src/test/resources/catalogConfig.yaml
new file mode 100644
index 00000000..05b2df4b
--- /dev/null
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+sourceCatalog:
+  catalogId: "source-1"
+  catalogConversionSourceImpl: 
"org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl"
+  catalogSyncClientImpl: 
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+  catalogProperties:
+    key01: "value1"
+    key02: "value2"
+    key03: "value3"
+targetCatalogs:
+  - catalogId: "target-1"
+    catalogSyncClientImpl: 
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+    catalogProperties:
+      key11: "value1"
+      key12: "value2"
+      key13: "value3"
+  - catalogId: "target-2"
+    catalogSyncClientImpl: 
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+    catalogProperties:
+      key21: "value1"
+      key22: "value2"
+      key23: "value3"
+  - catalogId: "target-3"
+    catalogSyncClientImpl: 
"org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl"
+    catalogProperties:
+      key31: "value1"
+      key32: "value2"
+      key33: "value3"
+datasets:
+  - sourceCatalogTableIdentifier:
+      tableIdentifier:
+        hierarchicalId: "source-database-1.source-1"
+    targetCatalogTableIdentifiers:
+      - catalogId: "target-1"
+        tableFormat: "DELTA"
+        tableIdentifier:
+          hierarchicalId: "target-database-1.target-tableName-1"
+      - catalogId: "target-2"
+        tableFormat: "HUDI"
+        tableIdentifier:
+          hierarchicalId: "target-database-2.target-tableName-2-delta"
+  - sourceCatalogTableIdentifier:
+      storageIdentifier:
+        tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
+        tableName: catalog_sales
+        partitionSpec: cs_sold_date_sk:VALUE
+        tableFormat: "HUDI"
+    targetCatalogTableIdentifiers:
+      - catalogId: "target-2"
+        tableFormat: "ICEBERG"
+        tableIdentifier:
+          hierarchicalId: "target-database-2.target-tableName-2"
+      - catalogId: "target-3"
+        tableFormat: "HUDI"
+        tableIdentifier:
+          hierarchicalId: 
"default-catalog-2.target-database-3.target-tableName-3"


Reply via email to