This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch 297-properties-based-config-2
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit 3159009a132544c9b682f2e044c9eefc794d1ade
Author: Timothy Brown <[email protected]>
AuthorDate: Sun Aug 18 19:59:44 2024 -0500

    Refactor config classes
---
 .../{PerTableConfig.java => ConversionConfig.java} |  41 +--
 .../apache/xtable/conversion/ExternalTable.java    |  77 +++++
 .../{HudiSourceConfig.java => SourceTable.java}    |  27 +-
 .../{PerTableConfig.java => TargetTable.java}      |  48 ++--
 .../apache/xtable/spi/sync/ConversionTarget.java   |   4 +-
 .../xtable/conversion/TestExternalTable.java       |  56 ++++
 .../apache/xtable/conversion/TestSourceTable.java} |  48 ++--
 .../apache/xtable/conversion/TestTargetTable.java} |  30 +-
 .../xtable/conversion/ConversionController.java    |  25 +-
 .../conversion/ConversionSourceProvider.java       |  13 +-
 .../xtable/conversion/ConversionTargetFactory.java |  16 +-
 .../xtable/conversion/PerTableConfigImpl.java      | 142 ----------
 .../delta/DeltaConversionSourceProvider.java       |  10 +-
 .../apache/xtable/delta/DeltaConversionTarget.java |  28 +-
 .../ConfigurationBasedPartitionSpecExtractor.java  |   4 +-
 .../xtable/hudi/HudiConversionSourceProvider.java  |  11 +-
 .../apache/xtable/hudi/HudiConversionTarget.java   |  20 +-
 ...SourceConfigImpl.java => HudiSourceConfig.java} |  32 ++-
 .../xtable/iceberg/IcebergConversionSource.java    |  33 ++-
 .../iceberg/IcebergConversionSourceProvider.java   |   4 +-
 .../xtable/iceberg/IcebergConversionTarget.java    |  22 +-
 .../org/apache/xtable/ITConversionController.java  | 310 +++++++++++----------
 .../xtable/conversion/TestConversionConfig.java    |  66 +++++
 .../conversion/TestConversionController.java       | 106 ++++---
 .../conversion/TestConversionTargetFactory.java    |  37 +--
 .../xtable/conversion/TestPerTableConfig.java      | 118 --------
 .../delta/ITDeltaConversionTargetSource.java       |  95 ++++---
 .../org/apache/xtable/delta/TestDeltaSync.java     |  14 +-
 .../xtable/hudi/ITHudiConversionSourceSource.java  |   2 +-
 .../xtable/hudi/ITHudiConversionSourceTarget.java  |  13 +-
 .../iceberg/ITIcebergConversionTargetSource.java   |  57 ++--
 .../iceberg/TestIcebergConversionTargetSource.java |  22 +-
 .../org/apache/xtable/iceberg/TestIcebergSync.java |  14 +-
 .../java/org/apache/xtable/loadtest/LoadTest.java  |  76 +++--
 .../apache/xtable/hudi/sync/XTableSyncConfig.java  |   4 +-
 .../apache/xtable/hudi/sync/XTableSyncTool.java    |  59 ++--
 .../xtable/hudi/sync/TestXTableSyncTool.java       |   2 +-
 .../java/org/apache/xtable/utilities/RunSync.java  |  64 +++--
 38 files changed, 924 insertions(+), 826 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
similarity index 54%
copy from 
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
copy to 
xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
index 612fc9b3..73e2628d 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
@@ -20,24 +20,31 @@ package org.apache.xtable.conversion;
 
 import java.util.List;
 
-import org.apache.xtable.model.sync.SyncMode;
-
-public interface PerTableConfig {
-  int getTargetMetadataRetentionInHours();
-
-  String getTableBasePath();
-
-  String getTableDataPath();
+import lombok.Builder;
+import lombok.NonNull;
+import lombok.Value;
 
-  String getTableName();
+import com.google.common.base.Preconditions;
 
-  HudiSourceConfig getHudiSourceConfig();
-
-  List<String> getTargetTableFormats();
-
-  SyncMode getSyncMode();
-
-  String[] getNamespace();
+import org.apache.xtable.model.sync.SyncMode;
 
-  CatalogConfig getIcebergCatalogConfig();
+@Value
+public class ConversionConfig {
+  // The source of the sync
+  @NonNull SourceTable sourceTable;
+  // One or more targets to sync the table metadata to
+  List<TargetTable> targetTables;
+  // The mode, incremental or snapshot
+  SyncMode syncMode;
+
+  @Builder
+  ConversionConfig(
+      @NonNull SourceTable sourceTable, List<TargetTable> targetTables, 
SyncMode syncMode) {
+    this.sourceTable = sourceTable;
+    this.targetTables = targetTables;
+    Preconditions.checkArgument(
+        targetTables != null && !targetTables.isEmpty(),
+        "Please provide at-least one format to sync");
+    this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
+  }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
new file mode 100644
index 00000000..939c59c0
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import java.util.Properties;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+/** Defines a reference to a table in a particular format. */
+@Getter
+@EqualsAndHashCode
+class ExternalTable {
+  /** The name of the table. */
+  protected final @NonNull String name;
+  /** The format of the table (e.g. DELTA, ICEBERG, HUDI) */
+  protected final @NonNull String formatName;
+  /** The path to the root of the table or the metadata directory depending on 
the format */
+  protected final @NonNull String basePath;
+  /** Optional namespace for the table */
+  protected final String[] namespace;
+  /** The configuration for interacting with the catalog that manages this 
table */
+  protected final CatalogConfig catalogConfig;
+
+  /** Optional, additional properties that can be used to define interactions 
with the table */
+  protected final Properties additionalProperties;
+
+  ExternalTable(
+      @NonNull String name,
+      @NonNull String formatName,
+      @NonNull String basePath,
+      String[] namespace,
+      CatalogConfig catalogConfig,
+      Properties additionalProperties) {
+    this.name = name;
+    this.formatName = formatName;
+    this.basePath = sanitizeBasePath(basePath);
+    this.namespace = namespace;
+    this.catalogConfig = catalogConfig;
+    this.additionalProperties = additionalProperties;
+  }
+
+  protected String sanitizeBasePath(String tableBasePath) {
+    Path path = new Path(tableBasePath);
+    Preconditions.checkArgument(path.isAbsolute(), "Table base path must be 
absolute");
+    if (path.isAbsoluteAndSchemeAuthorityNull()) {
+      // assume this is local file system and append scheme
+      return "file://" + path;
+    } else if (path.toUri().getScheme().equals("file")) {
+      // add extra slashes
+      return "file://" + path.toUri().getPath();
+    } else {
+      return path.toString();
+    }
+  }
+}
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
similarity index 53%
rename from 
xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java
rename to xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index 6edf356f..b37e1c1e 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -18,10 +18,29 @@
  
 package org.apache.xtable.conversion;
 
-import org.apache.xtable.spi.extractor.SourcePartitionSpecExtractor;
+import java.util.Properties;
 
-public interface HudiSourceConfig {
-  String getPartitionSpecExtractorClass();
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
 
-  SourcePartitionSpecExtractor loadSourcePartitionSpecExtractor();
+@EqualsAndHashCode(callSuper = true)
+@Getter
+public class SourceTable extends ExternalTable {
+  /** The path to the data files, defaults to the metadataPath */
+  @NonNull private final String dataPath;
+
+  @Builder(toBuilder = true)
+  public SourceTable(
+      String name,
+      String formatName,
+      String basePath,
+      String dataPath,
+      String[] namespace,
+      CatalogConfig catalogConfig,
+      Properties additionalProperties) {
+    super(name, formatName, basePath, namespace, catalogConfig, 
additionalProperties);
+    this.dataPath = dataPath == null ? this.getBasePath() : 
sanitizeBasePath(dataPath);
+  }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
similarity index 52%
copy from 
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
copy to xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
index 612fc9b3..6256da2c 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
@@ -18,26 +18,30 @@
  
 package org.apache.xtable.conversion;
 
-import java.util.List;
-
-import org.apache.xtable.model.sync.SyncMode;
-
-public interface PerTableConfig {
-  int getTargetMetadataRetentionInHours();
-
-  String getTableBasePath();
-
-  String getTableDataPath();
-
-  String getTableName();
-
-  HudiSourceConfig getHudiSourceConfig();
-
-  List<String> getTargetTableFormats();
-
-  SyncMode getSyncMode();
-
-  String[] getNamespace();
-
-  CatalogConfig getIcebergCatalogConfig();
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Properties;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class TargetTable extends ExternalTable {
+  private final Duration metadataRetention;
+
+  @Builder(toBuilder = true)
+  public TargetTable(
+      String name,
+      String formatName,
+      String basePath,
+      String[] namespace,
+      CatalogConfig catalogConfig,
+      Duration metadataRetention,
+      Properties additionalProperties) {
+    super(name, formatName, basePath, namespace, catalogConfig, 
additionalProperties);
+    this.metadataRetention =
+        metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : 
metadataRetention;
+  }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
index a0df756a..736f49e4 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
@@ -23,7 +23,7 @@ import java.util.Optional;
 
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalPartitionField;
@@ -89,5 +89,5 @@ public interface ConversionTarget {
   String getTableFormat();
 
   /** Initializes the client with provided configuration */
-  void init(PerTableConfig perTableConfig, Configuration configuration);
+  void init(TargetTable targetTable, Configuration configuration);
 }
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
new file mode 100644
index 00000000..5422b0a7
--- /dev/null
+++ 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+public class TestExternalTable {
+  @Test
+  void sanitizePath() {
+    ExternalTable tooManySlashes =
+        new ExternalTable("name", "hudi", "s3://bucket//path", null, null, 
null);
+    assertEquals("s3://bucket/path", tooManySlashes.getBasePath());
+
+    ExternalTable localFilePath =
+        new ExternalTable("name", "hudi", "/local/data//path", null, null, 
null);
+    assertEquals("file:///local/data/path", localFilePath.getBasePath());
+
+    ExternalTable properLocalFilePath =
+        new ExternalTable("name", "hudi", "file:///local/data//path", null, 
null, null);
+    assertEquals("file:///local/data/path", properLocalFilePath.getBasePath());
+  }
+
+  @Test
+  void errorIfRequiredArgsNotSet() {
+    assertThrows(
+        NullPointerException.class,
+        () -> new ExternalTable("name", "hudi", null, null, null, null));
+
+    assertThrows(
+        NullPointerException.class,
+        () -> new ExternalTable("name", null, "file://bucket/path", null, 
null, null));
+
+    assertThrows(
+        NullPointerException.class,
+        () -> new ExternalTable(null, "hudi", "file://bucket/path", null, 
null, null));
+  }
+}
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
similarity index 51%
copy from 
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
copy to 
xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
index 612fc9b3..05effaf9 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
@@ -18,26 +18,30 @@
  
 package org.apache.xtable.conversion;
 
-import java.util.List;
-
-import org.apache.xtable.model.sync.SyncMode;
-
-public interface PerTableConfig {
-  int getTargetMetadataRetentionInHours();
-
-  String getTableBasePath();
-
-  String getTableDataPath();
-
-  String getTableName();
-
-  HudiSourceConfig getHudiSourceConfig();
-
-  List<String> getTargetTableFormats();
-
-  SyncMode getSyncMode();
-
-  String[] getNamespace();
-
-  CatalogConfig getIcebergCatalogConfig();
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+class TestSourceTable {
+  @Test
+  void dataPathDefaultsToMetadataPath() {
+    String basePath = "file:///path/to/table";
+    SourceTable sourceTable =
+        
SourceTable.builder().name("name").formatName("hudi").basePath(basePath).build();
+    assertEquals(basePath, sourceTable.getDataPath());
+  }
+
+  @Test
+  void dataPathIsSanitized() {
+    String basePath = "file:///path/to/table";
+    String dataPath = "file:///path/to/table//data";
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name("name")
+            .formatName("hudi")
+            .basePath(basePath)
+            .dataPath(dataPath)
+            .build();
+    assertEquals("file:///path/to/table/data", sourceTable.getDataPath());
+  }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
similarity index 65%
rename from 
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
rename to 
xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
index 612fc9b3..9faa22c8 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
@@ -18,26 +18,18 @@
  
 package org.apache.xtable.conversion;
 
-import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import org.apache.xtable.model.sync.SyncMode;
+import java.time.Duration;
 
-public interface PerTableConfig {
-  int getTargetMetadataRetentionInHours();
+import org.junit.jupiter.api.Test;
 
-  String getTableBasePath();
-
-  String getTableDataPath();
-
-  String getTableName();
-
-  HudiSourceConfig getHudiSourceConfig();
-
-  List<String> getTargetTableFormats();
-
-  SyncMode getSyncMode();
-
-  String[] getNamespace();
-
-  CatalogConfig getIcebergCatalogConfig();
+class TestTargetTable {
+  @Test
+  void retentionDefaultsToSevenDays() {
+    String basePath = "file:///path/to/table";
+    TargetTable targetTable =
+        
TargetTable.builder().name("name").formatName("hudi").basePath(basePath).build();
+    assertEquals(Duration.ofDays(7), targetTable.getMetadataRetention());
+  }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index 34968733..dc665969 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -52,7 +51,7 @@ import org.apache.xtable.spi.sync.TableFormatSync;
 
 /**
  * Responsible for completing the entire lifecycle of the sync process given 
{@link
- * PerTableConfigImpl}. This is done in three steps,
+ * ConversionConfig}. This is done in three steps,
  *
  * <ul>
  *   <li>1. Extracting snapshot {@link InternalSnapshot} from the source table 
format.
@@ -72,33 +71,31 @@ public class ConversionController {
   }
 
   /**
-   * Runs a sync for the given source table configuration in PerTableConfig.
+   * Runs a sync for the given source table configuration in ConversionConfig.
    *
    * @param config A per table level config containing tableBasePath, 
partitionFieldSpecConfig,
    *     targetTableFormats and syncMode
    * @param conversionSourceProvider A provider for the {@link 
ConversionSource} instance, {@link
-   *     ConversionSourceProvider#init(Configuration, Map)} must be called 
before calling this
-   *     method.
+   *     ConversionSourceProvider#init(Configuration)} must be called before 
calling this method.
    * @return Returns a map containing the table format, and it's sync result. 
Run sync for a table
    *     with the provided per table level configuration.
    */
   public <COMMIT> Map<String, SyncResult> sync(
-      PerTableConfig config, ConversionSourceProvider<COMMIT> 
conversionSourceProvider) {
-    if (config.getTargetTableFormats() == null || 
config.getTargetTableFormats().isEmpty()) {
+      ConversionConfig config, ConversionSourceProvider<COMMIT> 
conversionSourceProvider) {
+    if (config.getTargetTables() == null || 
config.getTargetTables().isEmpty()) {
       throw new IllegalArgumentException("Please provide at-least one format 
to sync");
     }
 
     try (ConversionSource<COMMIT> conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(config)) {
+        
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
 
       Map<String, ConversionTarget> conversionTargetByFormat =
-          config.getTargetTableFormats().stream()
+          config.getTargetTables().stream()
               .collect(
                   Collectors.toMap(
-                      Function.identity(),
-                      tableFormat ->
-                          conversionTargetFactory.createForFormat(tableFormat, 
config, conf)));
+                      TargetTable::getFormatName,
+                      targetTable -> 
conversionTargetFactory.createForFormat(targetTable, conf)));
       // State for each TableFormat
       Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
           conversionTargetByFormat.entrySet().stream()
@@ -152,11 +149,11 @@ public class ConversionController {
   }
 
   private <COMMIT> Map<String, ConversionTarget> getFormatsToSyncIncrementally(
-      PerTableConfig perTableConfig,
+      ConversionConfig conversionConfig,
       Map<String, ConversionTarget> conversionTargetByFormat,
       Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat,
       ConversionSource<COMMIT> conversionSource) {
-    if (perTableConfig.getSyncMode() == SyncMode.FULL) {
+    if (conversionConfig.getSyncMode() == SyncMode.FULL) {
       // Full sync requested by config, hence no incremental sync.
       return Collections.emptyMap();
     }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
index 09cad6db..ccd6fde7 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
@@ -18,8 +18,6 @@
  
 package org.apache.xtable.conversion;
 
-import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.xtable.spi.extractor.ConversionSource;
@@ -33,16 +31,9 @@ public abstract class ConversionSourceProvider<COMMIT> {
   /** The Hadoop configuration to use when reading from the source table. */
   protected Configuration hadoopConf;
 
-  /** The configuration for the source. */
-  protected Map<String, String> sourceConf;
-
-  /** The configuration for the table to read from. */
-  protected PerTableConfig sourceTableConfig;
-
   /** Initializes the provider various source specific configurations. */
-  public void init(Configuration hadoopConf, Map<String, String> sourceConf) {
+  public void init(Configuration hadoopConf) {
     this.hadoopConf = hadoopConf;
-    this.sourceConf = sourceConf;
   }
 
   /**
@@ -54,5 +45,5 @@ public abstract class ConversionSourceProvider<COMMIT> {
    * @return the conversion source
    */
   public abstract ConversionSource<COMMIT> getConversionSourceInstance(
-      PerTableConfig sourceTableConfig);
+      SourceTable sourceTableConfig);
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
index 68e85b4d..8e355897 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
@@ -38,19 +38,17 @@ public class ConversionTargetFactory {
 
   /**
    * Create a fully initialized instance of the ConversionTarget represented 
by the given Table
-   * Format name. Initialization is done with the config provided through 
PerTableConfig and
+   * Format name. Initialization is done with the config provided through 
TargetTable and
    * Configuration params.
    *
-   * @param tableFormat
-   * @param perTableConfig
-   * @param configuration
-   * @return
+   * @param targetTable the spec of the target
+   * @param configuration hadoop configuration
+   * @return an intialized {@link ConversionTarget}
    */
-  public ConversionTarget createForFormat(
-      String tableFormat, PerTableConfig perTableConfig, Configuration 
configuration) {
-    ConversionTarget conversionTarget = 
createConversionTargetForName(tableFormat);
+  public ConversionTarget createForFormat(TargetTable targetTable, 
Configuration configuration) {
+    ConversionTarget conversionTarget = 
createConversionTargetForName(targetTable.getFormatName());
 
-    conversionTarget.init(perTableConfig, configuration);
+    conversionTarget.init(targetTable, configuration);
     return conversionTarget;
   }
 
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java
deleted file mode 100644
index f34402b1..00000000
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.xtable.conversion;
-
-import java.util.List;
-
-import javax.annotation.Nonnull;
-
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
-
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
-import org.apache.xtable.iceberg.IcebergCatalogConfig;
-import org.apache.xtable.model.sync.SyncMode;
-
-/** Represents input configuration to the sync process. */
-@Value
-public class PerTableConfigImpl implements PerTableConfig {
-  /** table base path in local file system or HDFS or object stores like S3, 
GCS etc. */
-  @Nonnull String tableBasePath;
-  /** the base path for the data folder, defaults to the tableBasePath if not 
specified */
-  @Nonnull String tableDataPath;
-
-  /** The name of the table */
-  @Nonnull String tableName;
-
-  /** The namespace of the table (optional) */
-  String[] namespace;
-
-  /**
-   * HudiSourceConfig is a config that allows us to infer partition values for 
hoodie source tables.
-   * If the table is not partitioned, leave it blank. If it is partitioned, 
you can specify a spec
-   * with a comma separated list with format path:type:format.
-   *
-   * <p><ui>
-   * <li>partitionSpecExtractorClass: class to extract partition fields from 
the given
-   *     spec.ConfigurationBasedPartitionSpecExtractor is the default class
-   * <li>partitionFieldSpecConfig: path:type:format spec to infer partition 
values </ui>
-   *
-   *     <ul>
-   *       <li>path: is a dot separated path to the partition field
-   *       <li>type: describes how the partition value was generated from the 
column value
-   *           <ul>
-   *             <li>VALUE: an identity transform of field value to partition 
value
-   *             <li>YEAR: data is partitioned by a field representing a date 
and year granularity
-   *                 is used
-   *             <li>MONTH: same as YEAR but with month granularity
-   *             <li>DAY: same as YEAR but with day granularity
-   *             <li>HOUR: same as YEAR but with hour granularity
-   *           </ul>
-   *       <li>format: if your partition type is YEAR, MONTH, DAY, or HOUR 
specify the format for
-   *           the date string as it appears in your file paths
-   *     </ul>
-   */
-  @Nonnull HudiSourceConfigImpl hudiSourceConfig;
-
-  /** List of table formats to sync. */
-  @Nonnull List<String> targetTableFormats;
-
-  /** Configuration options for integrating with an existing Iceberg Catalog 
(optional) */
-  IcebergCatalogConfig icebergCatalogConfig;
-
-  /**
-   * Mode of a sync. FULL is only supported right now.
-   *
-   * <ul>
-   *   <li>FULL: Full sync will create a checkpoint of ALL the files relevant 
at a certain point in
-   *       time
-   *   <li>INCREMENTAL: Incremental will sync differential structures to bring 
the table state from
-   *       and to points in the timeline
-   * </ul>
-   */
-  @Nonnull SyncMode syncMode;
-
-  /**
-   * The retention for metadata or versions of the table in the target systems 
to bound the size of
-   * any metadata tracked in the target system. Specified in hours.
-   */
-  int targetMetadataRetentionInHours;
-
-  @Builder
-  PerTableConfigImpl(
-      @NonNull String tableBasePath,
-      String tableDataPath,
-      @NonNull String tableName,
-      String[] namespace,
-      HudiSourceConfigImpl hudiSourceConfig,
-      @NonNull List<String> targetTableFormats,
-      IcebergCatalogConfig icebergCatalogConfig,
-      SyncMode syncMode,
-      Integer targetMetadataRetentionInHours) {
-    // sanitize source path
-    this.tableBasePath = sanitizeBasePath(tableBasePath);
-    this.tableDataPath = tableDataPath == null ? tableBasePath : 
sanitizeBasePath(tableDataPath);
-    this.tableName = tableName;
-    this.namespace = namespace;
-    this.hudiSourceConfig =
-        hudiSourceConfig == null ? HudiSourceConfigImpl.builder().build() : 
hudiSourceConfig;
-    Preconditions.checkArgument(
-        targetTableFormats.size() > 0, "Please provide at-least one format to 
sync");
-    this.targetTableFormats = targetTableFormats;
-    this.icebergCatalogConfig = icebergCatalogConfig;
-    this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
-    this.targetMetadataRetentionInHours =
-        targetMetadataRetentionInHours == null ? 24 * 7 : 
targetMetadataRetentionInHours;
-  }
-
-  private String sanitizeBasePath(String tableBasePath) {
-    Path path = new Path(tableBasePath);
-    Preconditions.checkArgument(path.isAbsolute(), "Table base path must be 
absolute");
-    if (path.isAbsoluteAndSchemeAuthorityNull()) {
-      // assume this is local file system and append scheme
-      return "file://" + path;
-    } else if (path.toUri().getScheme().equals("file")) {
-      // add extra slashes
-      return "file://" + path.toUri().getPath();
-    } else {
-      return path.toString();
-    }
-  }
-}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
index f42425db..045e2b72 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
@@ -23,18 +23,18 @@ import org.apache.spark.sql.SparkSession;
 import io.delta.tables.DeltaTable;
 
 import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
 
 /** A concrete implementation of {@link ConversionSourceProvider} for Delta 
Lake table format. */
 public class DeltaConversionSourceProvider extends 
ConversionSourceProvider<Long> {
   @Override
-  public DeltaConversionSource getConversionSourceInstance(PerTableConfig 
perTableConfig) {
+  public DeltaConversionSource getConversionSourceInstance(SourceTable 
sourceTable) {
     SparkSession sparkSession = 
DeltaConversionUtils.buildSparkSession(hadoopConf);
-    DeltaTable deltaTable = DeltaTable.forPath(sparkSession, 
perTableConfig.getTableBasePath());
+    DeltaTable deltaTable = DeltaTable.forPath(sparkSession, 
sourceTable.getBasePath());
     return DeltaConversionSource.builder()
         .sparkSession(sparkSession)
-        .tableName(perTableConfig.getTableName())
-        .basePath(perTableConfig.getTableBasePath())
+        .tableName(sourceTable.getName())
+        .basePath(sourceTable.getBasePath())
         .deltaTable(deltaTable)
         .deltaLog(deltaTable.deltaLog())
         .build();
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index 120ee0e0..b34fa449 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -54,7 +54,7 @@ import scala.collection.Seq;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
@@ -76,16 +76,16 @@ public class DeltaConversionTarget implements 
ConversionTarget {
   private DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor;
 
   private String tableName;
-  private int logRetentionInHours;
+  private long logRetentionInHours;
   private TransactionState transactionState;
 
   public DeltaConversionTarget() {}
 
-  public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession 
sparkSession) {
+  public DeltaConversionTarget(TargetTable targetTable, SparkSession 
sparkSession) {
     this(
-        perTableConfig.getTableDataPath(),
-        perTableConfig.getTableName(),
-        perTableConfig.getTargetMetadataRetentionInHours(),
+        targetTable.getBasePath(),
+        targetTable.getName(),
+        targetTable.getMetadataRetention().toHours(),
         sparkSession,
         DeltaSchemaExtractor.getInstance(),
         DeltaPartitionExtractor.getInstance(),
@@ -96,7 +96,7 @@ public class DeltaConversionTarget implements 
ConversionTarget {
   DeltaConversionTarget(
       String tableDataPath,
       String tableName,
-      int logRetentionInHours,
+      long logRetentionInHours,
       SparkSession sparkSession,
       DeltaSchemaExtractor schemaExtractor,
       DeltaPartitionExtractor partitionExtractor,
@@ -115,7 +115,7 @@ public class DeltaConversionTarget implements 
ConversionTarget {
   private void _init(
       String tableDataPath,
       String tableName,
-      int logRetentionInHours,
+      long logRetentionInHours,
       SparkSession sparkSession,
       DeltaSchemaExtractor schemaExtractor,
       DeltaPartitionExtractor partitionExtractor,
@@ -134,13 +134,13 @@ public class DeltaConversionTarget implements 
ConversionTarget {
   }
 
   @Override
-  public void init(PerTableConfig perTableConfig, Configuration configuration) 
{
+  public void init(TargetTable targetTable, Configuration configuration) {
     SparkSession sparkSession = 
DeltaConversionUtils.buildSparkSession(configuration);
 
     _init(
-        perTableConfig.getTableDataPath(),
-        perTableConfig.getTableName(),
-        perTableConfig.getTargetMetadataRetentionInHours(),
+        targetTable.getBasePath(),
+        targetTable.getName(),
+        targetTable.getMetadataRetention().toHours(),
         sparkSession,
         DeltaSchemaExtractor.getInstance(),
         DeltaPartitionExtractor.getInstance(),
@@ -221,7 +221,7 @@ public class DeltaConversionTarget implements 
ConversionTarget {
     private final OptimisticTransaction transaction;
     private final Instant commitTime;
     private final DeltaLog deltaLog;
-    private final int retentionInHours;
+    private final long retentionInHours;
     @Getter private final List<String> partitionColumns;
     private final String tableName;
     @Getter private StructType latestSchema;
@@ -230,7 +230,7 @@ public class DeltaConversionTarget implements 
ConversionTarget {
     @Setter private Seq<Action> actions;
 
     private TransactionState(
-        DeltaLog deltaLog, String tableName, Instant latestCommitTime, int 
retentionInHours) {
+        DeltaLog deltaLog, String tableName, Instant latestCommitTime, long 
retentionInHours) {
       this.deltaLog = deltaLog;
       this.transaction = deltaLog.startTransaction();
       this.latestSchema = deltaLog.snapshot().schema();
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
index e3659f19..7bc41a10 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
@@ -36,13 +36,13 @@ import org.apache.xtable.schema.SchemaFieldFinder;
  */
 @AllArgsConstructor
 public class ConfigurationBasedPartitionSpecExtractor implements 
HudiSourcePartitionSpecExtractor {
-  private final HudiSourceConfigImpl config;
+  private final HudiSourceConfig config;
 
   @Override
   public List<InternalPartitionField> spec(InternalSchema tableSchema) {
     List<InternalPartitionField> partitionFields =
         new ArrayList<>(config.getPartitionFieldSpecs().size());
-    for (HudiSourceConfigImpl.PartitionFieldSpec fieldSpec : 
config.getPartitionFieldSpecs()) {
+    for (HudiSourceConfig.PartitionFieldSpec fieldSpec : 
config.getPartitionFieldSpecs()) {
       InternalField sourceField =
           SchemaFieldFinder.getInstance()
               .findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
index f6e5e28d..0ddbbcb7 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
@@ -25,19 +25,18 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 
 import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
 
 /** A concrete implementation of {@link ConversionSourceProvider} for Hudi 
table format. */
 @Log4j2
 public class HudiConversionSourceProvider extends 
ConversionSourceProvider<HoodieInstant> {
 
   @Override
-  public HudiConversionSource getConversionSourceInstance(PerTableConfig 
sourceTableConfig) {
-    this.sourceTableConfig = sourceTableConfig;
+  public HudiConversionSource getConversionSourceInstance(SourceTable 
sourceTable) {
     HoodieTableMetaClient metaClient =
         HoodieTableMetaClient.builder()
             .setConf(hadoopConf)
-            .setBasePath(this.sourceTableConfig.getTableBasePath())
+            .setBasePath(sourceTable.getBasePath())
             .setLoadActiveTimelineOnLoad(true)
             .build();
     if 
(!metaClient.getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE))
 {
@@ -45,8 +44,8 @@ public class HudiConversionSourceProvider extends 
ConversionSourceProvider<Hoodi
     }
 
     final HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor =
-        (HudiSourcePartitionSpecExtractor)
-            
sourceTableConfig.getHudiSourceConfig().loadSourcePartitionSpecExtractor();
+        HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties())
+            .loadSourcePartitionSpecExtractor();
 
     return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor);
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index b1a0bc91..c3ef6f92 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -78,7 +78,7 @@ import org.apache.hudi.table.action.clean.CleanPlanner;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.xtable.avro.AvroSchemaConverter;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.exception.ReadException;
 import org.apache.xtable.exception.UpdateException;
@@ -108,16 +108,15 @@ public class HudiConversionTarget implements 
ConversionTarget {
 
   @VisibleForTesting
   HudiConversionTarget(
-      PerTableConfig perTableConfig,
+      TargetTable targetTable,
       Configuration configuration,
       int maxNumDeltaCommitsBeforeCompaction) {
     this(
-        perTableConfig.getTableDataPath(),
-        perTableConfig.getTargetMetadataRetentionInHours(),
+        targetTable.getBasePath(),
+        (int) targetTable.getMetadataRetention().toHours(),
         maxNumDeltaCommitsBeforeCompaction,
         BaseFileUpdatesExtractor.of(
-            new HoodieJavaEngineContext(configuration),
-            new CachingPath(perTableConfig.getTableDataPath())),
+            new HoodieJavaEngineContext(configuration), new 
CachingPath(targetTable.getBasePath())),
         AvroSchemaConverter.getInstance(),
         HudiTableManager.of(configuration),
         CommitState::new);
@@ -163,14 +162,13 @@ public class HudiConversionTarget implements 
ConversionTarget {
   }
 
   @Override
-  public void init(PerTableConfig perTableConfig, Configuration configuration) 
{
+  public void init(TargetTable targetTable, Configuration configuration) {
     _init(
-        perTableConfig.getTableDataPath(),
-        perTableConfig.getTargetMetadataRetentionInHours(),
+        targetTable.getBasePath(),
+        (int) targetTable.getMetadataRetention().toHours(),
         HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(),
         BaseFileUpdatesExtractor.of(
-            new HoodieJavaEngineContext(configuration),
-            new CachingPath(perTableConfig.getTableDataPath())),
+            new HoodieJavaEngineContext(configuration), new 
CachingPath(targetTable.getBasePath())),
         AvroSchemaConverter.getInstance(),
         HudiTableManager.of(configuration),
         CommitState::new);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
similarity index 68%
rename from 
xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java
rename to xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
index 0fa8b529..606b9281 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
@@ -22,29 +22,41 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.Properties;
 
-import lombok.Builder;
 import lombok.Value;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.xtable.conversion.HudiSourceConfig;
 import org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.reflection.ReflectionUtils;
 
 /** Configuration of Hudi source format for the sync process. */
 @Value
-public class HudiSourceConfigImpl implements HudiSourceConfig {
+public class HudiSourceConfig {
+  public static final String PARTITION_SPEC_EXTRACTOR_CLASS =
+      "xtable.hudi.source.partition_spec_extractor_class";
+  public static final String PARTITION_FIELD_SPEC_CONFIG =
+      "xtable.hudi.source.partition_field_spec_config";
+
   String partitionSpecExtractorClass;
   List<PartitionFieldSpec> partitionFieldSpecs;
 
-  @Builder
-  public HudiSourceConfigImpl(String partitionSpecExtractorClass, String 
partitionFieldSpecConfig) {
-    this.partitionSpecExtractorClass =
-        partitionSpecExtractorClass == null
-            ? ConfigurationBasedPartitionSpecExtractor.class.getName()
-            : partitionSpecExtractorClass;
-    this.partitionFieldSpecs = 
parsePartitionFieldSpecs(partitionFieldSpecConfig);
+  public static HudiSourceConfig fromPartitionFieldSpecConfig(String 
partitionFieldSpecConfig) {
+    return new HudiSourceConfig(
+        ConfigurationBasedPartitionSpecExtractor.class.getName(),
+        parsePartitionFieldSpecs(partitionFieldSpecConfig));
+  }
+
+  public static HudiSourceConfig fromProperties(Properties properties) {
+    String partitionSpecExtractorClass =
+        properties.getProperty(
+            PARTITION_SPEC_EXTRACTOR_CLASS,
+            ConfigurationBasedPartitionSpecExtractor.class.getName());
+    String partitionFieldSpecString = 
properties.getProperty(PARTITION_FIELD_SPEC_CONFIG);
+    List<PartitionFieldSpec> partitionFieldSpecs =
+        parsePartitionFieldSpecs(partitionFieldSpecString);
+    return new HudiSourceConfig(partitionSpecExtractorClass, 
partitionFieldSpecs);
   }
 
   @Value
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 7b26ee86..f96ec714 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -20,23 +20,35 @@ package org.apache.xtable.iceberg;
 
 import java.io.IOException;
 import java.time.Instant;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import lombok.*;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
 import lombok.extern.log4j.Log4j2;
 
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.iceberg.*;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.exception.ReadException;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
@@ -46,15 +58,18 @@ import org.apache.xtable.model.TableChange;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.spi.extractor.ConversionSource;
 
 @Log4j2
 @Builder
 public class IcebergConversionSource implements ConversionSource<Snapshot> {
   @NonNull private final Configuration hadoopConf;
-  @NonNull private final PerTableConfig sourceTableConfig;
+  @NonNull private final SourceTable sourceTableConfig;
 
   @Getter(lazy = true, value = AccessLevel.PACKAGE)
   private final Table sourceTable = initSourceTable();
@@ -73,15 +88,15 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
   private Table initSourceTable() {
     IcebergTableManager tableManager = IcebergTableManager.of(hadoopConf);
     String[] namespace = sourceTableConfig.getNamespace();
-    String tableName = sourceTableConfig.getTableName();
+    String tableName = sourceTableConfig.getName();
     TableIdentifier tableIdentifier =
         namespace == null
             ? TableIdentifier.of(tableName)
             : TableIdentifier.of(Namespace.of(namespace), tableName);
     return tableManager.getTable(
-        (IcebergCatalogConfig) sourceTableConfig.getIcebergCatalogConfig(),
+        (IcebergCatalogConfig) sourceTableConfig.getCatalogConfig(),
         tableIdentifier,
-        sourceTableConfig.getTableBasePath());
+        sourceTableConfig.getBasePath());
   }
 
   private FileIO initTableOps() {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
index 9a2bb840..449ebe5d 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
@@ -21,12 +21,12 @@ package org.apache.xtable.iceberg;
 import org.apache.iceberg.Snapshot;
 
 import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
 
 /** A concrete implementation of {@link ConversionSourceProvider} for Hudi 
table format. */
 public class IcebergConversionSourceProvider extends 
ConversionSourceProvider<Snapshot> {
   @Override
-  public IcebergConversionSource getConversionSourceInstance(PerTableConfig 
sourceTableConfig) {
+  public IcebergConversionSource getConversionSourceInstance(SourceTable 
sourceTableConfig) {
     return IcebergConversionSource.builder()
         .sourceTableConfig(sourceTableConfig)
         .hadoopConf(hadoopConf)
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index 72125cb0..ecdbfa26 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -39,7 +39,7 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NotFoundException;
 
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalPartitionField;
@@ -70,7 +70,7 @@ public class IcebergConversionTarget implements 
ConversionTarget {
   public IcebergConversionTarget() {}
 
   IcebergConversionTarget(
-      PerTableConfig perTableConfig,
+      TargetTable targetTable,
       Configuration configuration,
       IcebergSchemaExtractor schemaExtractor,
       IcebergSchemaSync schemaSync,
@@ -79,7 +79,7 @@ public class IcebergConversionTarget implements 
ConversionTarget {
       IcebergDataFileUpdatesSync dataFileUpdatesExtractor,
       IcebergTableManager tableManager) {
     _init(
-        perTableConfig,
+        targetTable,
         configuration,
         schemaExtractor,
         schemaSync,
@@ -90,7 +90,7 @@ public class IcebergConversionTarget implements 
ConversionTarget {
   }
 
   private void _init(
-      PerTableConfig perTableConfig,
+      TargetTable targetTable,
       Configuration configuration,
       IcebergSchemaExtractor schemaExtractor,
       IcebergSchemaSync schemaSync,
@@ -103,17 +103,17 @@ public class IcebergConversionTarget implements 
ConversionTarget {
     this.partitionSpecExtractor = partitionSpecExtractor;
     this.partitionSpecSync = partitionSpecSync;
     this.dataFileUpdatesExtractor = dataFileUpdatesExtractor;
-    String tableName = perTableConfig.getTableName();
-    this.basePath = perTableConfig.getTableBasePath();
+    String tableName = targetTable.getName();
+    this.basePath = targetTable.getBasePath();
     this.configuration = configuration;
-    this.snapshotRetentionInHours = 
perTableConfig.getTargetMetadataRetentionInHours();
-    String[] namespace = perTableConfig.getNamespace();
+    this.snapshotRetentionInHours = (int) 
targetTable.getMetadataRetention().toHours();
+    String[] namespace = targetTable.getNamespace();
     this.tableIdentifier =
         namespace == null
             ? TableIdentifier.of(tableName)
             : TableIdentifier.of(Namespace.of(namespace), tableName);
     this.tableManager = tableManager;
-    this.catalogConfig = (IcebergCatalogConfig) 
perTableConfig.getIcebergCatalogConfig();
+    this.catalogConfig = (IcebergCatalogConfig) targetTable.getCatalogConfig();
 
     if (tableManager.tableExists(catalogConfig, tableIdentifier, basePath)) {
       // Load the table state if it already exists
@@ -124,9 +124,9 @@ public class IcebergConversionTarget implements 
ConversionTarget {
   }
 
   @Override
-  public void init(PerTableConfig perTableConfig, Configuration configuration) 
{
+  public void init(TargetTable targetTable, Configuration configuration) {
     _init(
-        perTableConfig,
+        targetTable,
         configuration,
         IcebergSchemaExtractor.getInstance(),
         IcebergSchemaSync.getInstance(),
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 44338494..58f0f982 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -19,6 +19,7 @@
 package org.apache.xtable;
 
 import static org.apache.xtable.GenericTable.getTableName;
+import static 
org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
 import static org.apache.xtable.hudi.HudiTestUtil.PartitionConfig;
 import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
@@ -30,6 +31,7 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
@@ -41,6 +43,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -81,13 +84,13 @@ import org.apache.spark.sql.delta.DeltaLog;
 
 import com.google.common.collect.ImmutableList;
 
+import org.apache.xtable.conversion.ConversionConfig;
 import org.apache.xtable.conversion.ConversionController;
 import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.delta.DeltaConversionSourceProvider;
 import org.apache.xtable.hudi.HudiConversionSourceProvider;
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
 import org.apache.xtable.hudi.HudiTestUtil;
 import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
 import org.apache.xtable.model.storage.TableFormat;
@@ -147,17 +150,17 @@ public class ITConversionController {
     if (sourceTableFormat.equalsIgnoreCase(HUDI)) {
       ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider =
           new HudiConversionSourceProvider();
-      hudiConversionSourceProvider.init(jsc.hadoopConfiguration(), 
Collections.emptyMap());
+      hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
       return hudiConversionSourceProvider;
     } else if (sourceTableFormat.equalsIgnoreCase(DELTA)) {
       ConversionSourceProvider<Long> deltaConversionSourceProvider =
           new DeltaConversionSourceProvider();
-      deltaConversionSourceProvider.init(jsc.hadoopConfiguration(), 
Collections.emptyMap());
+      deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
       return deltaConversionSourceProvider;
     } else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) {
       ConversionSourceProvider<Snapshot> icebergConversionSourceProvider =
           new IcebergConversionSourceProvider();
-      icebergConversionSourceProvider.init(jsc.hadoopConfiguration(), 
Collections.emptyMap());
+      icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
       return icebergConversionSourceProvider;
     } else {
       throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
@@ -193,27 +196,26 @@ public class ITConversionController {
             tableName, tempDir, sparkSession, jsc, sourceTableFormat, 
isPartitioned)) {
       insertRecords = table.insertRows(100);
 
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(targetTableFormats)
-              .tableBasePath(table.getBasePath())
-              .tableDataPath(table.getDataPath())
-              .hudiSourceConfig(
-                  
HudiSourceConfigImpl.builder().partitionFieldSpecConfig(partitionConfig).build())
-              .syncMode(syncMode)
-              .build();
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              syncMode,
+              tableName,
+              table,
+              targetTableFormats,
+              partitionConfig,
+              null);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 
100);
 
       // make multiple commits and then sync
       table.insertRows(100);
       table.upsertRows(insertRecords.subList(0, 20));
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 
200);
 
       table.deleteRows(insertRecords.subList(30, 50));
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 
180);
       checkDatasetEquivalenceWithFilter(
           sourceTableFormat, table, targetTableFormats, 
table.getFilterQuery());
@@ -222,39 +224,38 @@ public class ITConversionController {
     try (GenericTable tableWithUpdatedSchema =
         GenericTable.getInstanceWithAdditionalColumns(
             tableName, tempDir, sparkSession, jsc, sourceTableFormat, 
isPartitioned)) {
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(targetTableFormats)
-              .tableBasePath(tableWithUpdatedSchema.getBasePath())
-              .tableDataPath(tableWithUpdatedSchema.getDataPath())
-              .hudiSourceConfig(
-                  
HudiSourceConfigImpl.builder().partitionFieldSpecConfig(partitionConfig).build())
-              .syncMode(syncMode)
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              syncMode,
+              tableName,
+              tableWithUpdatedSchema,
+              targetTableFormats,
+              partitionConfig,
+              null);
       List<Row> insertsAfterSchemaUpdate = 
tableWithUpdatedSchema.insertRows(100);
       tableWithUpdatedSchema.reload();
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, 
targetTableFormats, 280);
 
       tableWithUpdatedSchema.deleteRows(insertsAfterSchemaUpdate.subList(60, 
90));
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, 
targetTableFormats, 250);
 
       if (isPartitioned) {
         // Adds new partition.
         tableWithUpdatedSchema.insertRecordsForSpecialPartition(50);
-        conversionController.sync(perTableConfig, conversionSourceProvider);
+        conversionController.sync(conversionConfig, conversionSourceProvider);
         checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, 
targetTableFormats, 300);
 
         // Drops partition.
         tableWithUpdatedSchema.deleteSpecialPartition();
-        conversionController.sync(perTableConfig, conversionSourceProvider);
+        conversionController.sync(conversionConfig, conversionSourceProvider);
         checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, 
targetTableFormats, 250);
 
         // Insert records to the dropped partition again.
         tableWithUpdatedSchema.insertRecordsForSpecialPartition(50);
-        conversionController.sync(perTableConfig, conversionSourceProvider);
+        conversionController.sync(conversionConfig, conversionSourceProvider);
         checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, 
targetTableFormats, 300);
       }
     }
@@ -279,24 +280,22 @@ public class ITConversionController {
       String commitInstant2 = table.startCommit();
       table.insertRecordsWithCommitAlreadyStarted(insertsForCommit2, 
commitInstant2, true);
 
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(targetTableFormats)
-              .tableBasePath(table.getBasePath())
-              .hudiSourceConfig(
-                  HudiSourceConfigImpl.builder()
-                      
.partitionFieldSpecConfig(partitionConfig.getXTableConfig())
-                      .build())
-              .syncMode(syncMode)
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              HUDI,
+              syncMode,
+              tableName,
+              table,
+              targetTableFormats,
+              partitionConfig.getXTableConfig(),
+              null);
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
 
       checkDatasetEquivalence(HUDI, table, targetTableFormats, 50);
       table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, 
commitInstant1, true);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, targetTableFormats, 100);
     }
   }
@@ -314,20 +313,18 @@ public class ITConversionController {
             tableName, tempDir, jsc, partitionConfig.getHudiConfig(), 
tableType)) {
       List<HoodieRecord<HoodieAvroPayload>> insertedRecords1 = 
table.insertRecords(50, true);
 
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(targetTableFormats)
-              .tableBasePath(table.getBasePath())
-              .hudiSourceConfig(
-                  HudiSourceConfigImpl.builder()
-                      
.partitionFieldSpecConfig(partitionConfig.getXTableConfig())
-                      .build())
-              .syncMode(syncMode)
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              HUDI,
+              syncMode,
+              tableName,
+              table,
+              targetTableFormats,
+              partitionConfig.getXTableConfig(),
+              null);
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, targetTableFormats, 50);
 
       table.deleteRecords(insertedRecords1.subList(0, 20), true);
@@ -335,7 +332,7 @@ public class ITConversionController {
       String scheduledCompactionInstant = table.onlyScheduleCompaction();
 
       table.insertRecords(50, true);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       Map<String, String> sourceHudiOptions =
           Collections.singletonMap("hoodie.datasource.query.type", 
"read_optimized");
       // Because compaction is not completed yet and read optimized query, 
there are 100 records.
@@ -343,13 +340,13 @@ public class ITConversionController {
           HUDI, table, sourceHudiOptions, targetTableFormats, 
Collections.emptyMap(), 100);
 
       table.insertRecords(50, true);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       // Because compaction is not completed yet and read optimized query, 
there are 150 records.
       checkDatasetEquivalence(
           HUDI, table, sourceHudiOptions, targetTableFormats, 
Collections.emptyMap(), 150);
 
       table.completeScheduledCompaction(scheduledCompactionInstant);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, targetTableFormats, 130);
     }
   }
@@ -362,31 +359,32 @@ public class ITConversionController {
         GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, 
sourceTableFormat, false)) {
       table.insertRows(50);
       List<String> targetTableFormats = getOtherFormats(sourceTableFormat);
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(targetTableFormats)
-              .tableBasePath(table.getBasePath())
-              .tableDataPath(table.getDataPath())
-              .syncMode(SyncMode.INCREMENTAL)
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              targetTableFormats,
+              null,
+              null);
       ConversionSourceProvider<?> conversionSourceProvider =
           getConversionSourceProvider(sourceTableFormat);
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       Instant instantAfterFirstSync = Instant.now();
       // sleep before starting the next commit to avoid any rounding issues
       Thread.sleep(1000);
 
       table.insertRows(50);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       Instant instantAfterSecondSync = Instant.now();
       // sleep before starting the next commit to avoid any rounding issues
       Thread.sleep(1000);
 
       table.insertRows(50);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
 
       checkDatasetEquivalence(
           sourceTableFormat,
@@ -485,25 +483,22 @@ public class ITConversionController {
           GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, 
sourceTableFormat, true);
     }
     try (GenericTable tableToClose = table) {
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(targetTableFormats)
-              .tableBasePath(tableToClose.getBasePath())
-              .tableDataPath(tableToClose.getDataPath())
-              .hudiSourceConfig(
-                  HudiSourceConfigImpl.builder()
-                      .partitionFieldSpecConfig(xTablePartitionConfig)
-                      .build())
-              .syncMode(SyncMode.INCREMENTAL)
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              sourceTableFormat,
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              targetTableFormats,
+              xTablePartitionConfig,
+              null);
       tableToClose.insertRows(100);
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       // Do a second sync to force the test to read back the metadata it wrote 
earlier
       tableToClose.insertRows(100);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
 
       checkDatasetEquivalenceWithFilter(
           sourceTableFormat, tableToClose, targetTableFormats, filter);
@@ -520,33 +515,23 @@ public class ITConversionController {
             tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
       table.insertRecords(100, true);
 
-      PerTableConfig perTableConfigIceberg =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(ImmutableList.of(ICEBERG))
-              .tableBasePath(table.getBasePath())
-              .syncMode(syncMode)
-              .build();
-
-      PerTableConfig perTableConfigDelta =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(ImmutableList.of(DELTA))
-              .tableBasePath(table.getBasePath())
-              .syncMode(syncMode)
-              .build();
+      ConversionConfig conversionConfigIceberg =
+          getTableSyncConfig(
+              HUDI, syncMode, tableName, table, ImmutableList.of(ICEBERG), 
null, null);
+      ConversionConfig conversionConfigDelta =
+          getTableSyncConfig(HUDI, syncMode, tableName, table, 
ImmutableList.of(DELTA), null, null);
 
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
-      conversionController.sync(perTableConfigIceberg, 
conversionSourceProvider);
+      conversionController.sync(conversionConfigIceberg, 
conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG), 
100);
-      conversionController.sync(perTableConfigDelta, conversionSourceProvider);
+      conversionController.sync(conversionConfigDelta, 
conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 
100);
 
       table.insertRecords(100, true);
-      conversionController.sync(perTableConfigIceberg, 
conversionSourceProvider);
+      conversionController.sync(conversionConfigIceberg, 
conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG), 
200);
-      conversionController.sync(perTableConfigDelta, conversionSourceProvider);
+      conversionController.sync(conversionConfigDelta, 
conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 
200);
     }
   }
@@ -558,21 +543,18 @@ public class ITConversionController {
     try (TestJavaHudiTable table =
         TestJavaHudiTable.forStandardSchema(
             tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
-      PerTableConfig singleTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(ImmutableList.of(ICEBERG))
-              .tableBasePath(table.getBasePath())
-              .syncMode(SyncMode.INCREMENTAL)
-              .build();
-
-      PerTableConfig dualTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(Arrays.asList(ICEBERG, DELTA))
-              .tableBasePath(table.getBasePath())
-              .syncMode(SyncMode.INCREMENTAL)
-              .build();
+      ConversionConfig singleTableConfig =
+          getTableSyncConfig(
+              HUDI, SyncMode.INCREMENTAL, tableName, table, 
ImmutableList.of(ICEBERG), null, null);
+      ConversionConfig dualTableConfig =
+          getTableSyncConfig(
+              HUDI,
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              Arrays.asList(ICEBERG, DELTA),
+              null,
+              null);
 
       table.insertRecords(50, true);
       ConversionController conversionController =
@@ -612,18 +594,20 @@ public class ITConversionController {
       table.insertRows(20);
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(Collections.singletonList(ICEBERG))
-              .tableBasePath(table.getBasePath())
-              .syncMode(SyncMode.INCREMENTAL)
-              .build();
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              HUDI,
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              Collections.singletonList(ICEBERG),
+              null,
+              null);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       table.insertRows(10);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       table.insertRows(10);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       // corrupt last two snapshots
       Table icebergTable = new 
HadoopTables(jsc.hadoopConfiguration()).load(table.getBasePath());
       long currentSnapshotId = icebergTable.currentSnapshot().snapshotId();
@@ -633,7 +617,7 @@ public class ITConversionController {
       Files.delete(
           
Paths.get(URI.create(icebergTable.snapshot(previousSnapshotId).manifestListLocation())));
       table.insertRows(10);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG), 
50);
     }
   }
@@ -645,18 +629,19 @@ public class ITConversionController {
     try (TestJavaHudiTable table =
         TestJavaHudiTable.forStandardSchema(
             tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(Arrays.asList(ICEBERG, DELTA))
-              .tableBasePath(table.getBasePath())
-              .syncMode(SyncMode.INCREMENTAL)
-              .targetMetadataRetentionInHours(0) // force cleanup
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              HUDI,
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              Arrays.asList(ICEBERG, DELTA),
+              null,
+              Duration.ofHours(0)); // force cleanup
       ConversionController conversionController =
           new ConversionController(jsc.hadoopConfiguration());
       table.insertRecords(10, true);
-      conversionController.sync(perTableConfig, conversionSourceProvider);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
       // later we will ensure we can still read the source table at this 
instant to ensure that
       // neither target cleaned up the underlying parquet files in the table
       Instant instantAfterFirstCommit = Instant.now();
@@ -667,7 +652,7 @@ public class ITConversionController {
           .forEach(
               unused -> {
                 table.insertRecords(10, true);
-                conversionController.sync(perTableConfig, 
conversionSourceProvider);
+                conversionController.sync(conversionConfig, 
conversionSourceProvider);
               });
       // ensure that hudi rows can still be read and underlying files were not 
removed
       List<Row> rows =
@@ -859,4 +844,45 @@ public class ITConversionController {
     Optional<String> hudiSourceConfig;
     String filter;
   }
+
+  private static ConversionConfig getTableSyncConfig(
+      String sourceTableFormat,
+      SyncMode syncMode,
+      String tableName,
+      GenericTable table,
+      List<String> targetTableFormats,
+      String partitionConfig,
+      Duration metadataRetention) {
+    Properties sourceProperties = new Properties();
+    if (partitionConfig != null) {
+      sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
+    }
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(tableName)
+            .formatName(sourceTableFormat)
+            .basePath(table.getBasePath())
+            .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
+            .build();
+
+    List<TargetTable> targetTables =
+        targetTableFormats.stream()
+            .map(
+                formatName ->
+                    TargetTable.builder()
+                        .name(tableName)
+                        .formatName(formatName)
+                        // set the metadata path to the data path as the 
default (required by Hudi)
+                        .basePath(table.getDataPath())
+                        .metadataRetention(metadataRetention)
+                        .build())
+            .collect(Collectors.toList());
+
+    return ConversionConfig.builder()
+        .sourceTable(sourceTable)
+        .targetTables(targetTables)
+        .syncMode(syncMode)
+        .build();
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java
new file mode 100644
index 00000000..6e502af0
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.conversion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.sync.SyncMode;
+
+class TestConversionConfig {
+
+  @Test
+  void defaultValueSet() {
+    ConversionConfig conversionConfig =
+        ConversionConfig.builder()
+            .sourceTable(mock(SourceTable.class))
+            .targetTables(Collections.singletonList(mock(TargetTable.class)))
+            .build();
+
+    assertEquals(SyncMode.INCREMENTAL, conversionConfig.getSyncMode());
+  }
+
+  @Test
+  void errorIfSourceTableNotSet() {
+    assertThrows(
+        NullPointerException.class,
+        () ->
+            ConversionConfig.builder()
+                
.targetTables(Collections.singletonList(mock(TargetTable.class)))
+                .build());
+  }
+
+  @Test
+  void errorIfNoTargetsSet() {
+    Exception thrownException =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                ConversionConfig.builder()
+                    .sourceTable(mock(SourceTable.class))
+                    .targetTables(Collections.emptyList())
+                    .build());
+    assertEquals("Please provide at-least one format to sync", 
thrownException.getMessage());
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 558b8ee5..652bbe42 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -18,7 +18,7 @@
  
 package org.apache.xtable.conversion;
 
-import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
@@ -79,13 +79,16 @@ public class TestConversionController {
     Map<String, SyncResult> perTableResults = new HashMap<>();
     perTableResults.put(TableFormat.ICEBERG, syncResult);
     perTableResults.put(TableFormat.DELTA, syncResult);
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
-    
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
         .thenReturn(mockConversionTarget1);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
         .thenReturn(mockConversionTarget2);
     
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
     when(tableFormatSync.syncSnapshot(
@@ -95,20 +98,23 @@ public class TestConversionController {
     ConversionController conversionController =
         new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
     Map<String, SyncResult> result =
-        conversionController.sync(perTableConfig, 
mockConversionSourceProvider);
+        conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(perTableResults, result);
   }
 
   @Test
   void testAllIncrementalSyncAsPerConfigAndNoFallbackNecessary() {
     SyncMode syncMode = SyncMode.INCREMENTAL;
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
-    
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
         .thenReturn(mockConversionTarget1);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
         .thenReturn(mockConversionTarget2);
 
     Instant instantAsOfNow = Instant.now();
@@ -178,7 +184,7 @@ public class TestConversionController {
     ConversionController conversionController =
         new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
     Map<String, SyncResult> result =
-        conversionController.sync(perTableConfig, 
mockConversionSourceProvider);
+        conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
   }
 
@@ -192,13 +198,16 @@ public class TestConversionController {
     Map<String, SyncResult> syncResults = new HashMap<>();
     syncResults.put(TableFormat.ICEBERG, syncResult);
     syncResults.put(TableFormat.DELTA, syncResult);
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
-    
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
         .thenReturn(mockConversionTarget1);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
         .thenReturn(mockConversionTarget2);
 
     Instant instantAsOfNow = Instant.now();
@@ -219,20 +228,23 @@ public class TestConversionController {
     ConversionController conversionController =
         new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
     Map<String, SyncResult> result =
-        conversionController.sync(perTableConfig, 
mockConversionSourceProvider);
+        conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(syncResults, result);
   }
 
   @Test
   void testIncrementalSyncFallbackToSnapshotForOnlySingleFormat() {
     SyncMode syncMode = SyncMode.INCREMENTAL;
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
-    
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
         .thenReturn(mockConversionTarget1);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
         .thenReturn(mockConversionTarget2);
 
     Instant instantAsOfNow = Instant.now();
@@ -300,20 +312,23 @@ public class TestConversionController {
     ConversionController conversionController =
         new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
     Map<String, SyncResult> result =
-        conversionController.sync(perTableConfig, 
mockConversionSourceProvider);
+        conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
   }
 
   @Test
   void incrementalSyncWithNoPendingInstantsForAllFormats() {
     SyncMode syncMode = SyncMode.INCREMENTAL;
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
-    
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+    ConversionConfig conversionConfig =
+        getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
+    when(mockConversionSourceProvider.getConversionSourceInstance(
+            conversionConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(0), mockConf))
         .thenReturn(mockConversionTarget1);
-    when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, 
perTableConfig, mockConf))
+    when(mockConversionTargetFactory.createForFormat(
+            conversionConfig.getTargetTables().get(1), mockConf))
         .thenReturn(mockConversionTarget2);
 
     Instant instantAsOfNow = Instant.now();
@@ -355,7 +370,7 @@ public class TestConversionController {
     ConversionController conversionController =
         new ConversionController(mockConf, mockConversionTargetFactory, 
tableFormatSync);
     Map<String, SyncResult> result =
-        conversionController.sync(perTableConfig, 
mockConversionSourceProvider);
+        conversionController.sync(conversionConfig, 
mockConversionSourceProvider);
     assertEquals(expectedSyncResult, result);
   }
 
@@ -394,14 +409,31 @@ public class TestConversionController {
   }
 
   private Instant getInstantAtLastNMinutes(Instant currentInstant, int n) {
-    return Instant.now().minus(Duration.ofMinutes(n));
+    return currentInstant.minus(Duration.ofMinutes(n));
   }
 
-  private PerTableConfig getPerTableConfig(List<String> targetTableFormats, 
SyncMode syncMode) {
-    return PerTableConfigImpl.builder()
-        .tableName(getTableName())
-        .tableBasePath("/tmp/doesnt/matter")
-        .targetTableFormats(targetTableFormats)
+  private ConversionConfig getTableSyncConfig(List<String> targetTableFormats, 
SyncMode syncMode) {
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name("tablename")
+            .formatName(HUDI)
+            .basePath("/tmp/doesnt/matter")
+            .build();
+
+    List<TargetTable> targetTables =
+        targetTableFormats.stream()
+            .map(
+                formatName ->
+                    TargetTable.builder()
+                        .name("tablename")
+                        .formatName(formatName)
+                        .basePath("/tmp/doesnt/matter")
+                        .build())
+            .collect(Collectors.toList());
+
+    return ConversionConfig.builder()
+        .sourceTable(sourceTable)
+        .targetTables(targetTables)
         .syncMode(syncMode)
         .build();
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
index 34d3cbef..0984b42b 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
@@ -24,15 +24,11 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
 
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.model.storage.TableFormat;
-import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.spi.sync.ConversionTarget;
 
 public class TestConversionTargetFactory {
@@ -42,11 +38,10 @@ public class TestConversionTargetFactory {
     ConversionTarget tc =
         
ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.DELTA);
     assertNotNull(tc);
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.DELTA), 
SyncMode.INCREMENTAL);
+    TargetTable targetTable = getPerTableConfig(TableFormat.DELTA);
     Configuration conf = new Configuration();
     conf.set("spark.master", "local");
-    tc.init(perTableConfig, conf);
+    tc.init(targetTable, conf);
     assertEquals(tc.getTableFormat(), TableFormat.DELTA);
   }
 
@@ -55,11 +50,10 @@ public class TestConversionTargetFactory {
     ConversionTarget tc =
         
ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.HUDI);
     assertNotNull(tc);
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.HUDI), 
SyncMode.INCREMENTAL);
+    TargetTable targetTable = getPerTableConfig(TableFormat.HUDI);
     Configuration conf = new Configuration();
     conf.setStrings("spark.master", "local");
-    tc.init(perTableConfig, conf);
+    tc.init(targetTable, conf);
     assertEquals(tc.getTableFormat(), TableFormat.HUDI);
   }
 
@@ -68,11 +62,10 @@ public class TestConversionTargetFactory {
     ConversionTarget tc =
         
ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.ICEBERG);
     assertNotNull(tc);
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.ICEBERG), 
SyncMode.INCREMENTAL);
+    TargetTable targetTable = getPerTableConfig(TableFormat.ICEBERG);
     Configuration conf = new Configuration();
     conf.setStrings("spark.master", "local");
-    tc.init(perTableConfig, conf);
+    tc.init(targetTable, conf);
     assertEquals(tc.getTableFormat(), TableFormat.ICEBERG);
   }
 
@@ -88,22 +81,18 @@ public class TestConversionTargetFactory {
 
   @Test
   public void testConversionTargetFromFormatType() {
-    PerTableConfig perTableConfig =
-        getPerTableConfig(Arrays.asList(TableFormat.DELTA), 
SyncMode.INCREMENTAL);
+    TargetTable targetTable = getPerTableConfig(TableFormat.DELTA);
     Configuration conf = new Configuration();
     conf.setStrings("spark.master", "local");
-    ConversionTarget tc =
-        ConversionTargetFactory.getInstance()
-            .createForFormat(TableFormat.DELTA, perTableConfig, conf);
+    ConversionTarget tc = 
ConversionTargetFactory.getInstance().createForFormat(targetTable, conf);
     assertEquals(tc.getTableFormat(), TableFormat.DELTA);
   }
 
-  private PerTableConfig getPerTableConfig(List<String> targetTableFormats, 
SyncMode syncMode) {
-    return PerTableConfigImpl.builder()
-        .tableName(getTableName())
-        .tableBasePath("/tmp/doesnt/matter")
-        .targetTableFormats(targetTableFormats)
-        .syncMode(syncMode)
+  private TargetTable getPerTableConfig(String tableFormat) {
+    return TargetTable.builder()
+        .name(getTableName())
+        .basePath("/tmp/doesnt/matter")
+        .formatName(tableFormat)
         .build();
   }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java
deleted file mode 100644
index 4f057385..00000000
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.xtable.conversion;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.util.Collections;
-
-import org.junit.jupiter.api.Test;
-
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
-import org.apache.xtable.model.storage.TableFormat;
-import org.apache.xtable.model.sync.SyncMode;
-
-class TestPerTableConfig {
-
-  @Test
-  void sanitizePath() {
-    PerTableConfig tooManySlashes =
-        PerTableConfigImpl.builder()
-            .tableBasePath("s3://bucket//path")
-            .tableName("name")
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
-            .build();
-    assertEquals("s3://bucket/path", tooManySlashes.getTableBasePath());
-
-    PerTableConfig localFilePath =
-        PerTableConfigImpl.builder()
-            .tableBasePath("/local/data//path")
-            .tableName("name")
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
-            .build();
-    assertEquals("file:///local/data/path", localFilePath.getTableBasePath());
-
-    PerTableConfig properLocalFilePath =
-        PerTableConfigImpl.builder()
-            .tableBasePath("file:///local/data//path")
-            .tableName("name")
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
-            .build();
-    assertEquals("file:///local/data/path", 
properLocalFilePath.getTableBasePath());
-  }
-
-  @Test
-  void defaultValueSet() {
-    PerTableConfig perTableConfig =
-        PerTableConfigImpl.builder()
-            .tableBasePath("file://bucket/path")
-            .tableName("name")
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
-            .build();
-
-    assertEquals(24 * 7, perTableConfig.getTargetMetadataRetentionInHours());
-    assertEquals(SyncMode.INCREMENTAL, perTableConfig.getSyncMode());
-    assertEquals(HudiSourceConfigImpl.builder().build(), 
perTableConfig.getHudiSourceConfig());
-    assertNull(perTableConfig.getNamespace());
-    assertNull(perTableConfig.getIcebergCatalogConfig());
-  }
-
-  @Test
-  void errorIfRequiredArgsNotSet() {
-    assertThrows(
-        NullPointerException.class,
-        () ->
-            PerTableConfigImpl.builder()
-                .tableName("name")
-                
.targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
-                .build());
-
-    assertThrows(
-        NullPointerException.class,
-        () ->
-            PerTableConfigImpl.builder()
-                .tableBasePath("file://bucket/path")
-                
.targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
-                .build());
-
-    assertThrows(
-        NullPointerException.class,
-        () ->
-            PerTableConfigImpl.builder()
-                .tableBasePath("file://bucket/path")
-                .tableName("name")
-                .build());
-  }
-
-  @Test
-  void errorIfNoTargetsSet() {
-    Exception thrownException =
-        assertThrows(
-            IllegalArgumentException.class,
-            () ->
-                PerTableConfigImpl.builder()
-                    .tableName("name")
-                    .tableBasePath("file://bucket/path")
-                    .targetTableFormats(Collections.emptyList())
-                    .build());
-    assertEquals("Please provide at-least one format to sync", 
thrownException.getMessage());
-  }
-}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
index 3e2d61f5..8fcf0753 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
@@ -53,8 +53,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.apache.xtable.GenericTable;
 import org.apache.xtable.TestSparkDeltaTable;
 import org.apache.xtable.ValidationTestHelper;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
@@ -149,7 +148,7 @@ public class ITDeltaConversionTargetSource {
     hadoopConf.set("fs.defaultFS", "file:///");
 
     conversionSourceProvider = new DeltaConversionSourceProvider();
-    conversionSourceProvider.init(hadoopConf, null);
+    conversionSourceProvider.init(hadoopConf);
   }
 
   @Test
@@ -165,11 +164,11 @@ public class ITDeltaConversionTargetSource {
             + basePath
             + "' AS SELECT * FROM VALUES (1, 2)");
     // Create Delta source
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(tableName)
-            .tableBasePath(basePath.toString())
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(basePath.toString())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -223,11 +222,11 @@ public class ITDeltaConversionTargetSource {
             + basePath
             + "' AS SELECT 'SingleValue' AS part_col, 1 AS col1, 2 AS col2");
     // Create Delta source
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(tableName)
-            .tableBasePath(basePath.toString())
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(basePath.toString())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -311,11 +310,11 @@ public class ITDeltaConversionTargetSource {
             + tableName
             + "` VALUES(1, CAST('2012-02-12 00:12:34' AS TIMESTAMP))");
     // Create Delta source
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(tableName)
-            .tableBasePath(basePath.toString())
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(basePath.toString())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -350,11 +349,11 @@ public class ITDeltaConversionTargetSource {
 
     testSparkDeltaTable.insertRows(50);
     allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(testSparkDeltaTable.getTableName())
-            .tableBasePath(testSparkDeltaTable.getBasePath())
-            .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -390,11 +389,11 @@ public class ITDeltaConversionTargetSource {
     // Insert 50 rows to 2018 partition.
     List<Row> commit1Rows = testSparkDeltaTable.insertRowsForPartition(50, 
2018);
     Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(testSparkDeltaTable.getTableName())
-            .tableBasePath(testSparkDeltaTable.getBasePath())
-            .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -458,11 +457,11 @@ public class ITDeltaConversionTargetSource {
     testSparkDeltaTable.insertRows(50);
     allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
 
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(testSparkDeltaTable.getTableName())
-            .tableBasePath(testSparkDeltaTable.getBasePath())
-            .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -506,11 +505,11 @@ public class ITDeltaConversionTargetSource {
     testSparkDeltaTable.insertRows(50);
     allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
 
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(testSparkDeltaTable.getTableName())
-            .tableBasePath(testSparkDeltaTable.getBasePath())
-            .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -563,11 +562,11 @@ public class ITDeltaConversionTargetSource {
     testSparkDeltaTable.insertRowsForPartition(20, partitionValueToDelete);
     allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
 
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(testSparkDeltaTable.getTableName())
-            .tableBasePath(testSparkDeltaTable.getBasePath())
-            .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -623,11 +622,11 @@ public class ITDeltaConversionTargetSource {
     testSparkDeltaTable.insertRows(50);
     allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
 
-    PerTableConfig tableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(testSparkDeltaTable.getTableName())
-            .tableBasePath(testSparkDeltaTable.getBasePath())
-            .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.ICEBERG))
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(testSparkDeltaTable.getTableName())
+            .basePath(testSparkDeltaTable.getBasePath())
+            .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
         conversionSourceProvider.getConversionSourceInstance(tableConfig);
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index 135ce995..f0f889d2 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -26,7 +26,9 @@ import static 
org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -76,7 +78,7 @@ import io.delta.standalone.expressions.Literal;
 import io.delta.standalone.types.IntegerType;
 import io.delta.standalone.types.StringType;
 
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.schema.InternalField;
@@ -126,11 +128,11 @@ public class TestDeltaSync {
     Files.createDirectories(basePath);
     conversionTarget =
         new DeltaConversionTarget(
-            PerTableConfigImpl.builder()
-                .tableName(tableName)
-                .tableBasePath(basePath.toString())
-                .targetMetadataRetentionInHours(1)
-                
.targetTableFormats(Collections.singletonList(TableFormat.DELTA))
+            TargetTable.builder()
+                .name(tableName)
+                .basePath(basePath.toString())
+                .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
+                .formatName(TableFormat.DELTA)
                 .build(),
             sparkSession);
   }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
index 408e4373..c074bc23 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
@@ -569,7 +569,7 @@ public class ITHudiConversionSourceSource {
             .build();
     HudiSourcePartitionSpecExtractor partitionSpecExtractor =
         new ConfigurationBasedPartitionSpecExtractor(
-            
HudiSourceConfigImpl.builder().partitionFieldSpecConfig(xTablePartitionConfig).build());
+            
HudiSourceConfig.fromPartitionFieldSpecConfig(xTablePartitionConfig));
     return new HudiConversionSource(hoodieTableMetaClient, 
partitionSpecExtractor);
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
index 7990bfbf..12885567 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
@@ -24,6 +24,7 @@ import static 
org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.nio.file.Path;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
@@ -69,7 +70,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
 
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalField;
@@ -582,11 +583,11 @@ public class ITHudiConversionSourceTarget {
 
   private HudiConversionTarget getTargetClient() {
     return new HudiConversionTarget(
-        PerTableConfigImpl.builder()
-            .tableBasePath(tableBasePath)
-            .targetTableFormats(Collections.singletonList(TableFormat.HUDI))
-            .tableName("test_table")
-            .targetMetadataRetentionInHours(4)
+        TargetTable.builder()
+            .basePath(tableBasePath)
+            .formatName(TableFormat.HUDI)
+            .name("test_table")
+            .metadataRetention(Duration.of(4, ChronoUnit.HOURS))
             .build(),
         CONFIGURATION,
         3);
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
index 997ab184..3f20ac9d 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
@@ -29,7 +29,6 @@ import java.nio.file.Path;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -46,8 +45,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.data.Record;
 
 import org.apache.xtable.TestIcebergTable;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
@@ -65,7 +63,7 @@ public class ITIcebergConversionTargetSource {
   @BeforeEach
   void setup() {
     sourceProvider = new IcebergConversionSourceProvider();
-    sourceProvider.init(hadoopConf, null);
+    sourceProvider.init(hadoopConf);
   }
 
   @ParameterizedTest
@@ -97,11 +95,11 @@ public class ITIcebergConversionTargetSource {
       testIcebergTable.insertRows(50);
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
 
-      PerTableConfig tableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(testIcebergTable.getTableName())
-              .tableBasePath(testIcebergTable.getBasePath())
-              .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.DELTA))
+      SourceTable tableConfig =
+          SourceTable.builder()
+              .name(testIcebergTable.getTableName())
+              .basePath(testIcebergTable.getBasePath())
+              .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
           sourceProvider.getConversionSourceInstance(tableConfig);
@@ -157,11 +155,11 @@ public class ITIcebergConversionTargetSource {
       testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete);
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
 
-      PerTableConfig tableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(testIcebergTable.getTableName())
-              .tableBasePath(testIcebergTable.getBasePath())
-              .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.DELTA))
+      SourceTable tableConfig =
+          SourceTable.builder()
+              .name(testIcebergTable.getTableName())
+              .basePath(testIcebergTable.getBasePath())
+              .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
           sourceProvider.getConversionSourceInstance(tableConfig);
@@ -217,11 +215,11 @@ public class ITIcebergConversionTargetSource {
       testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete);
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
 
-      PerTableConfig tableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(testIcebergTable.getTableName())
-              .tableBasePath(testIcebergTable.getBasePath())
-              .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.DELTA))
+      SourceTable tableConfig =
+          SourceTable.builder()
+              .name(testIcebergTable.getTableName())
+              .basePath(testIcebergTable.getBasePath())
+              .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
           sourceProvider.getConversionSourceInstance(tableConfig);
@@ -277,11 +275,11 @@ public class ITIcebergConversionTargetSource {
       testIcebergTable.insertRows(50);
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
 
-      PerTableConfig tableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(testIcebergTable.getTableName())
-              .tableBasePath(testIcebergTable.getBasePath())
-              .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.DELTA))
+      SourceTable tableConfig =
+          SourceTable.builder()
+              .name(testIcebergTable.getTableName())
+              .basePath(testIcebergTable.getBasePath())
+              .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
           sourceProvider.getConversionSourceInstance(tableConfig);
@@ -318,12 +316,11 @@ public class ITIcebergConversionTargetSource {
       // Insert 50 rows to INFO partition.
       List<Record> commit1Rows = 
testIcebergTable.insertRecordsForPartition(50, "INFO");
       Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
-      PerTableConfig tableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(testIcebergTable.getTableName())
-              .tableBasePath(testIcebergTable.getBasePath())
-              .tableDataPath(testIcebergTable.getDataPath())
-              .targetTableFormats(Arrays.asList(TableFormat.HUDI, 
TableFormat.DELTA))
+      SourceTable tableConfig =
+          SourceTable.builder()
+              .name(testIcebergTable.getTableName())
+              .basePath(testIcebergTable.getBasePath())
+              .formatName(TableFormat.ICEBERG)
               .build();
 
       // Upsert all rows inserted before, so all files are replaced.
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
index d100a313..784d6585 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
@@ -26,7 +26,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Instant;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -48,8 +47,7 @@ import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.Types;
 
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
@@ -77,7 +75,7 @@ class TestIcebergConversionTargetSource {
     hadoopConf.set("fs.defaultFS", "file:///");
 
     sourceProvider = new IcebergConversionSourceProvider();
-    sourceProvider.init(hadoopConf, null);
+    sourceProvider.init(hadoopConf);
 
     tableManager = IcebergTableManager.of(hadoopConf);
 
@@ -91,7 +89,7 @@ class TestIcebergConversionTargetSource {
   @Test
   void getTableTest(@TempDir Path workingDir) throws IOException {
     Table catalogSales = createTestTableWithData(workingDir.toString());
-    PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales);
+    SourceTable sourceTableConfig = getPerTableConfig(catalogSales);
 
     IcebergConversionSource conversionSource =
         sourceProvider.getConversionSourceInstance(sourceTableConfig);
@@ -123,7 +121,7 @@ class TestIcebergConversionTargetSource {
     Table catalogSales = createTestTableWithData(workingDir.toString());
     Snapshot iceCurrentSnapshot = catalogSales.currentSnapshot();
 
-    PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales);
+    SourceTable sourceTableConfig = getPerTableConfig(catalogSales);
 
     IcebergDataFileExtractor spyDataFileExtractor = 
spy(IcebergDataFileExtractor.builder().build());
     IcebergPartitionValueConverter spyPartitionConverter =
@@ -384,7 +382,7 @@ class TestIcebergConversionTargetSource {
   }
 
   private IcebergConversionSource getIcebergConversionSource(Table 
catalogSales) {
-    PerTableConfig tableConfig = getPerTableConfig(catalogSales);
+    SourceTable tableConfig = getPerTableConfig(catalogSales);
 
     return IcebergConversionSource.builder()
         .hadoopConf(hadoopConf)
@@ -392,11 +390,11 @@ class TestIcebergConversionTargetSource {
         .build();
   }
 
-  private static PerTableConfig getPerTableConfig(Table catalogSales) {
-    return PerTableConfigImpl.builder()
-        .tableName(catalogSales.name())
-        .tableBasePath(catalogSales.location())
-        .targetTableFormats(Collections.singletonList(TableFormat.DELTA))
+  private static SourceTable getPerTableConfig(Table catalogSales) {
+    return SourceTable.builder()
+        .name(catalogSales.name())
+        .basePath(catalogSales.location())
+        .formatName(TableFormat.ICEBERG)
         .build();
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index 45140b44..bd36dde9 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -37,7 +37,9 @@ import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -81,7 +83,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
 import org.apache.xtable.ITConversionController;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.metadata.TableSyncMetadata;
@@ -189,11 +191,11 @@ public class TestIcebergSync {
 
   private IcebergConversionTarget getConversionTarget() {
     return new IcebergConversionTarget(
-        PerTableConfigImpl.builder()
-            .tableBasePath(basePath.toString())
-            .tableName(tableName)
-            .targetMetadataRetentionInHours(1)
-            .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+        TargetTable.builder()
+            .basePath(basePath.toString())
+            .name(tableName)
+            .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
+            .formatName(TableFormat.ICEBERG)
             .build(),
         CONFIGURATION,
         mockSchemaExtractor,
diff --git a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java 
b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
index 8b1a24e0..341b2cb0 100644
--- a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
+++ b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
@@ -18,9 +18,12 @@
  
 package org.apache.xtable.loadtest;
 
+import static 
org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
+
 import java.nio.file.Path;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -34,11 +37,13 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieArchivalConfig;
 
+import org.apache.xtable.GenericTable;
 import org.apache.xtable.TestJavaHudiTable;
+import org.apache.xtable.conversion.ConversionConfig;
 import org.apache.xtable.conversion.ConversionController;
 import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.hudi.HudiConversionSourceProvider;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
@@ -56,7 +61,7 @@ public class LoadTest {
   @BeforeEach
   public void setup() {
     hudiConversionSourceProvider = new HudiConversionSourceProvider();
-    hudiConversionSourceProvider.init(CONFIGURATION, Collections.emptyMap());
+    hudiConversionSourceProvider.init(CONFIGURATION);
   }
 
   @Test
@@ -75,16 +80,15 @@ public class LoadTest {
                 .collect(Collectors.toList()),
             false);
       }
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA))
-              .tableBasePath(table.getBasePath())
-              .syncMode(SyncMode.FULL)
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              SyncMode.FULL,
+              tableName,
+              table,
+              Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA));
       ConversionController conversionController = new 
ConversionController(CONFIGURATION);
       long start = System.currentTimeMillis();
-      conversionController.sync(perTableConfig, hudiConversionSourceProvider);
+      conversionController.sync(conversionConfig, 
hudiConversionSourceProvider);
       long end = System.currentTimeMillis();
       System.out.println("Full sync took " + (end - start) + "ms");
     }
@@ -104,16 +108,15 @@ public class LoadTest {
         TestJavaHudiTable.forStandardSchema(
             tableName, tempDir, "level:SIMPLE", HoodieTableType.COPY_ON_WRITE, 
archivalConfig)) {
       table.insertRecords(1, "partition0", false);
-      PerTableConfig perTableConfig =
-          PerTableConfigImpl.builder()
-              .tableName(tableName)
-              .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA))
-              .tableBasePath(table.getBasePath())
-              .syncMode(SyncMode.INCREMENTAL)
-              .build();
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA));
       // sync once to establish first commit
       ConversionController conversionController = new 
ConversionController(CONFIGURATION);
-      conversionController.sync(perTableConfig, hudiConversionSourceProvider);
+      conversionController.sync(conversionConfig, 
hudiConversionSourceProvider);
       for (int i = 0; i < numCommits; i++) {
         table.insertRecords(
             1,
@@ -124,9 +127,40 @@ public class LoadTest {
       }
 
       long start = System.currentTimeMillis();
-      conversionController.sync(perTableConfig, hudiConversionSourceProvider);
+      conversionController.sync(conversionConfig, 
hudiConversionSourceProvider);
       long end = System.currentTimeMillis();
       System.out.println("Incremental sync took " + (end - start) + "ms");
     }
   }
+
+  private static ConversionConfig getTableSyncConfig(
+      SyncMode syncMode, String tableName, GenericTable table, List<String> 
targetTableFormats) {
+    Properties sourceProperties = new Properties();
+    sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, "level:VALUE");
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(tableName)
+            .formatName(TableFormat.HUDI)
+            .basePath(table.getBasePath())
+            .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
+            .build();
+
+    List<TargetTable> targetTables =
+        targetTableFormats.stream()
+            .map(
+                formatName ->
+                    TargetTable.builder()
+                        .name(tableName)
+                        .formatName(formatName)
+                        .basePath(table.getBasePath())
+                        .build())
+            .collect(Collectors.toList());
+
+    return ConversionConfig.builder()
+        .sourceTable(sourceTable)
+        .targetTables(targetTables)
+        .syncMode(syncMode)
+        .build();
+  }
 }
diff --git 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
index 67b00417..4d878cea 100644
--- 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
+++ 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
@@ -26,12 +26,12 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 public class XTableSyncConfig extends HoodieSyncConfig implements Serializable 
{
 
-  public static final ConfigProperty<String> ONE_TABLE_FORMATS =
+  public static final ConfigProperty<String> XTABLE_FORMATS =
       ConfigProperty.key("hoodie.xtable.formats.to.sync")
           .defaultValue("DELTA,ICEBERG")
           .withDocumentation("Comma separated list of formats to sync.");
 
-  public static final ConfigProperty<Integer> 
ONE_TABLE_TARGET_METADATA_RETENTION_HOURS =
+  public static final ConfigProperty<Integer> 
XTABLE_TARGET_METADATA_RETENTION_HOURS =
       ConfigProperty.key("hoodie.xtable.target.metadata.retention.hr")
           .defaultValue(24 * 7)
           .withDocumentation("Retention in hours for metadata in target 
table.");
diff --git 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
index 49a7a743..a9653eb5 100644
--- 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
+++ 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
@@ -18,8 +18,11 @@
  
 package org.apache.xtable.hudi.sync;
 
+import static 
org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
+import static org.apache.xtable.model.storage.TableFormat.HUDI;
+
+import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -34,11 +37,11 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.sync.common.HoodieSyncTool;
 
+import org.apache.xtable.conversion.ConversionConfig;
 import org.apache.xtable.conversion.ConversionController;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.hudi.HudiConversionSourceProvider;
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
 import org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.model.sync.SyncResult;
@@ -55,39 +58,57 @@ public class XTableSyncTool extends HoodieSyncTool {
     super(props, hadoopConf);
     this.config = new XTableSyncConfig(props);
     this.hudiConversionSourceProvider = new HudiConversionSourceProvider();
-    hudiConversionSourceProvider.init(hadoopConf, Collections.emptyMap());
+    hudiConversionSourceProvider.init(hadoopConf);
   }
 
   @Override
   public void syncHoodieTable() {
     List<String> formatsToSync =
-        
Arrays.stream(config.getString(XTableSyncConfig.ONE_TABLE_FORMATS).split(","))
+        
Arrays.stream(config.getString(XTableSyncConfig.XTABLE_FORMATS).split(","))
             .map(format -> format.toUpperCase())
             .collect(Collectors.toList());
     String basePath = config.getString(HoodieSyncConfig.META_SYNC_BASE_PATH);
     String tableName = 
config.getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
-    PerTableConfig perTableConfig =
-        PerTableConfigImpl.builder()
-            .tableName(tableName)
-            .tableBasePath(basePath)
-            .targetTableFormats(formatsToSync)
-            .hudiSourceConfig(
-                HudiSourceConfigImpl.builder()
-                    .partitionFieldSpecConfig(getPartitionSpecConfig())
-                    .build())
+    Properties sourceProperties = new Properties();
+    sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, 
getPartitionSpecConfig());
+    SourceTable sourceTable =
+        SourceTable.builder()
+            .name(tableName)
+            .formatName(HUDI)
+            .basePath(basePath)
+            .additionalProperties(sourceProperties)
+            .build();
+    Duration metadataRetention =
+        
config.contains(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS)
+            ? Duration.ofHours(
+                
config.getInt(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS))
+            : null;
+    List<TargetTable> targetTables =
+        formatsToSync.stream()
+            .map(
+                format ->
+                    TargetTable.builder()
+                        .basePath(basePath)
+                        .metadataRetention(metadataRetention)
+                        .formatName(format)
+                        .name(tableName)
+                        .build())
+            .collect(Collectors.toList());
+    ConversionConfig conversionConfig =
+        ConversionConfig.builder()
+            .sourceTable(sourceTable)
+            .targetTables(targetTables)
             .syncMode(SyncMode.INCREMENTAL)
-            .targetMetadataRetentionInHours(
-                
config.getInt(XTableSyncConfig.ONE_TABLE_TARGET_METADATA_RETENTION_HOURS))
             .build();
     Map<String, SyncResult> results =
-        new ConversionController(hadoopConf).sync(perTableConfig, 
hudiConversionSourceProvider);
+        new ConversionController(hadoopConf).sync(conversionConfig, 
hudiConversionSourceProvider);
     String failingFormats =
         results.entrySet().stream()
             .filter(
                 entry ->
                     entry.getValue().getStatus().getStatusCode()
                         != SyncResult.SyncStatusCode.SUCCESS)
-            .map(entry -> entry.getKey().toString())
+            .map(Map.Entry::getKey)
             .collect(Collectors.joining(","));
     if (!failingFormats.isEmpty()) {
       throw new HoodieException("Unable to sync to InternalTable for formats: 
" + failingFormats);
diff --git 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
index d44465c8..4024674b 100644
--- 
a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
+++ 
b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
@@ -98,7 +98,7 @@ public class TestXTableSyncTool {
     writeBasicHudiTable(path, options);
 
     Properties properties = new Properties();
-    properties.put(XTableSyncConfig.ONE_TABLE_FORMATS.key(), "iceberg,DELTA");
+    properties.put(XTableSyncConfig.XTABLE_FORMATS.key(), "iceberg,DELTA");
     properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
partitionPath);
     properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), path);
     properties.putAll(options);
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index 7fb65c42..c84753de 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -25,6 +25,8 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
 
 import lombok.Data;
 import lombok.extern.log4j.Log4j2;
@@ -44,17 +46,16 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.xtable.conversion.ConversionConfig;
 import org.apache.xtable.conversion.ConversionController;
 import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
-import org.apache.xtable.hudi.ConfigurationBasedPartitionSpecExtractor;
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiSourceConfig;
 import org.apache.xtable.iceberg.IcebergCatalogConfig;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
 import org.apache.xtable.reflection.ReflectionUtils;
-import 
org.apache.xtable.utilities.RunSync.TableFormatConverters.ConversionConfig;
 
 /**
  * Provides a standalone runner for the sync process. See README.md for more 
details on how to run
@@ -129,7 +130,7 @@ public class RunSync {
     String sourceFormat = datasetConfig.sourceFormat;
     customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
     TableFormatConverters tableFormatConverters = 
loadTableFormatConversionConfigs(customConfig);
-    ConversionConfig sourceConversionConfig =
+    TableFormatConverters.ConversionConfig sourceConversionConfig =
         tableFormatConverters.getTableFormatConverters().get(sourceFormat);
     if (sourceConversionConfig == null) {
       throw new IllegalArgumentException(
@@ -140,7 +141,7 @@ public class RunSync {
     String sourceProviderClass = 
sourceConversionConfig.conversionSourceProviderClass;
     ConversionSourceProvider<?> conversionSourceProvider =
         ReflectionUtils.createInstanceOfClass(sourceProviderClass);
-    conversionSourceProvider.init(hadoopConf, 
sourceConversionConfig.configuration);
+    conversionSourceProvider.init(hadoopConf);
 
     List<String> tableFormatList = datasetConfig.getTargetFormats();
     ConversionController conversionController = new 
ConversionController(hadoopConf);
@@ -149,24 +150,45 @@ public class RunSync {
           "Running sync for basePath {} for following table formats {}",
           table.getTableBasePath(),
           tableFormatList);
-      PerTableConfig config =
-          PerTableConfigImpl.builder()
-              .tableBasePath(table.getTableBasePath())
-              .tableName(table.getTableName())
+      Properties sourceProperties = new Properties();
+      if (table.getPartitionSpec() != null) {
+        sourceProperties.put(
+            HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, 
table.getPartitionSpec());
+      }
+      SourceTable sourceTable =
+          SourceTable.builder()
+              .name(table.getTableName())
+              .basePath(table.getTableBasePath())
               .namespace(table.getNamespace() == null ? null : 
table.getNamespace().split("\\."))
-              .tableDataPath(table.getTableDataPath())
-              .icebergCatalogConfig(icebergCatalogConfig)
-              .hudiSourceConfig(
-                  HudiSourceConfigImpl.builder()
-                      .partitionSpecExtractorClass(
-                          
ConfigurationBasedPartitionSpecExtractor.class.getName())
-                      .partitionFieldSpecConfig(table.getPartitionSpec())
-                      .build())
-              .targetTableFormats(tableFormatList)
+              .dataPath(table.getTableDataPath())
+              .catalogConfig(icebergCatalogConfig)
+              .additionalProperties(sourceProperties)
+              .formatName(sourceFormat)
+              .build();
+      List<TargetTable> targetTables =
+          tableFormatList.stream()
+              .map(
+                  tableFormat ->
+                      TargetTable.builder()
+                          .name(table.getTableName())
+                          .basePath(table.getTableBasePath())
+                          .namespace(
+                              table.getNamespace() == null
+                                  ? null
+                                  : table.getNamespace().split("\\."))
+                          .catalogConfig(icebergCatalogConfig)
+                          .formatName(tableFormat)
+                          .build())
+              .collect(Collectors.toList());
+
+      ConversionConfig conversionConfig =
+          ConversionConfig.builder()
+              .sourceTable(sourceTable)
+              .targetTables(targetTables)
               .syncMode(SyncMode.INCREMENTAL)
               .build();
       try {
-        conversionController.sync(config, conversionSourceProvider);
+        conversionController.sync(conversionConfig, conversionSourceProvider);
       } catch (Exception e) {
         log.error(String.format("Error running sync for %s", 
table.getTableBasePath()), e);
       }


Reply via email to