This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch 297-properties-based-config-2
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git

commit 08c31514de18643bf0b1908acef889d276dc27a7
Author: Timothy Brown <t...@onehouse.ai>
AuthorDate: Fri Aug 2 14:34:36 2024 -0400

    PR feedback
---
 .../apache/xtable/conversion/ExternalTable.java    | 41 ++++++++++++++++++----
 .../org/apache/xtable/conversion/SourceTable.java  | 16 ++++++---
 .../apache/xtable/conversion/TableSyncConfig.java  |  6 +---
 .../org/apache/xtable/conversion/TargetTable.java  | 10 +++---
 .../xtable/conversion/TestExternalTable.java       | 18 +++++-----
 .../apache/xtable/conversion/TestSourceTable.java  | 10 +++---
 .../apache/xtable/conversion/TestTargetTable.java  |  4 +--
 .../xtable/conversion/ConversionController.java    |  3 +-
 .../conversion/ConversionSourceProvider.java       |  4 +--
 .../delta/DeltaConversionSourceProvider.java       |  7 ++--
 .../apache/xtable/delta/DeltaConversionTarget.java |  4 +--
 .../xtable/hudi/HudiConversionSourceProvider.java  |  8 ++---
 .../apache/xtable/hudi/HudiConversionTarget.java   |  8 ++---
 .../org/apache/xtable/hudi/HudiSourceConfig.java   |  7 ++--
 .../xtable/iceberg/IcebergConversionSource.java    |  2 +-
 .../iceberg/IcebergConversionSourceProvider.java   |  5 +--
 .../xtable/iceberg/IcebergConversionTarget.java    |  2 +-
 .../org/apache/xtable/ITConversionController.java  |  9 +++--
 .../conversion/TestConversionController.java       | 10 +++---
 .../xtable/conversion/TestTableSyncConfig.java     |  1 -
 .../delta/ITDeltaConversionTargetSource.java       | 38 ++++++++++----------
 .../org/apache/xtable/delta/TestDeltaSync.java     |  2 +-
 .../xtable/hudi/ITHudiConversionSourceTarget.java  |  2 +-
 .../iceberg/ITIcebergConversionTargetSource.java   | 21 ++++++-----
 .../iceberg/TestIcebergConversionTargetSource.java |  3 +-
 .../org/apache/xtable/iceberg/TestIcebergSync.java |  2 +-
 .../java/org/apache/xtable/loadtest/LoadTest.java  |  9 +++--
 .../java/org/apache/xtable/utilities/RunSync.java  |  9 +++--
 28 files changed, 145 insertions(+), 116 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
index eeb690d2..f6580c4b 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -26,26 +26,53 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Preconditions;
 
+import java.util.Properties;
+
+/**
+ * Defines a reference to a table in a particular format.
+ */
 @Getter
 @EqualsAndHashCode
 class ExternalTable {
-  @NonNull String name;
-  @NonNull String formatName;
-  @NonNull String metadataPath;
-  String[] namespace;
-  CatalogConfig catalogConfig;
+  /**
+   * The name of the table.
+   */
+  protected final @NonNull String name;
+  /**
+   * The format of the table (e.g. DELTA, ICEBERG, HUDI)
+   */
+  protected final @NonNull String formatName;
+  /**
+   * The path to the root of the table or the metadata directory depending on 
the format
+   */
+  protected final @NonNull String basePath;
+  /**
+   * Optional namespace for the table
+   */
+  protected final String[] namespace;
+  /**
+   * The configuration for interacting with the catalog that manages this table
+   */
+  protected final CatalogConfig catalogConfig;
+
+  /**
+   * Optional, additional properties that can be used to define interactions 
with the table
+   */
+  protected final Properties additionalProperties;
 
   ExternalTable(
       @NonNull String name,
       @NonNull String formatName,
       @NonNull String basePath,
       String[] namespace,
-      CatalogConfig catalogConfig) {
+      CatalogConfig catalogConfig,
+      Properties additionalProperties) {
     this.name = name;
     this.formatName = formatName;
-    this.metadataPath = sanitizeBasePath(basePath);
+    this.basePath = sanitizeBasePath(basePath);
     this.namespace = namespace;
     this.catalogConfig = catalogConfig;
+    this.additionalProperties = additionalProperties;
   }
 
   protected String sanitizeBasePath(String tableBasePath) {
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index b219cf12..37224476 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -23,20 +23,26 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.NonNull;
 
+import java.util.Properties;
+
 @EqualsAndHashCode(callSuper = true)
 @Getter
 public class SourceTable extends ExternalTable {
-  @NonNull String dataPath;
+  /**
+   * The path to the data files, defaults to the metadataPath
+   */
+  @NonNull private final String dataPath;
 
   @Builder(toBuilder = true)
   public SourceTable(
       String name,
       String formatName,
-      String metadataPath,
+      String basePath,
       String dataPath,
       String[] namespace,
-      CatalogConfig catalogConfig) {
-    super(name, formatName, metadataPath, namespace, catalogConfig);
-    this.dataPath = dataPath == null ? this.metadataPath : 
sanitizeBasePath(dataPath);
+      CatalogConfig catalogConfig,
+      Properties additionalProperties) {
+    super(name, formatName, basePath, namespace, catalogConfig, 
additionalProperties);
+    this.dataPath = dataPath == null ? this.getBasePath() : 
sanitizeBasePath(dataPath);
   }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java
index a56ec300..81ba99c3 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java
@@ -38,21 +38,17 @@ public class TableSyncConfig {
   List<TargetTable> targetTables;
   // The mode, incremental or snapshot
   SyncMode syncMode;
-  // Additional properties to be used when initializing the conversion source
-  Map<String, String> properties;
 
   @Builder
   TableSyncConfig(
       @NonNull SourceTable sourceTable,
       List<TargetTable> targetTables,
-      SyncMode syncMode,
-      Map<String, String> properties) {
+      SyncMode syncMode) {
     this.sourceTable = sourceTable;
     this.targetTables = targetTables;
     Preconditions.checkArgument(
         targetTables != null && !targetTables.isEmpty(),
         "Please provide at-least one format to sync");
     this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
-    this.properties = properties == null ? Collections.emptyMap() : properties;
   }
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java 
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
index 067eb8a9..6256da2c 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
@@ -20,6 +20,7 @@ package org.apache.xtable.conversion;
 
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
+import java.util.Properties;
 
 import lombok.Builder;
 import lombok.EqualsAndHashCode;
@@ -28,17 +29,18 @@ import lombok.Getter;
 @Getter
 @EqualsAndHashCode(callSuper = true)
 public class TargetTable extends ExternalTable {
-  Duration metadataRetention;
+  private final Duration metadataRetention;
 
   @Builder(toBuilder = true)
   public TargetTable(
       String name,
       String formatName,
-      String metadataPath,
+      String basePath,
       String[] namespace,
       CatalogConfig catalogConfig,
-      Duration metadataRetention) {
-    super(name, formatName, metadataPath, namespace, catalogConfig);
+      Duration metadataRetention,
+      Properties additionalProperties) {
+    super(name, formatName, basePath, namespace, catalogConfig, 
additionalProperties);
     this.metadataRetention =
         metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : 
metadataRetention;
   }
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
index 6345baf0..6bc42da4 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
@@ -27,29 +27,29 @@ public class TestExternalTable {
   @Test
   void sanitizePath() {
     ExternalTable tooManySlashes =
-        new ExternalTable("name", "hudi", "s3://bucket//path", null, null);
-    assertEquals("s3://bucket/path", tooManySlashes.getMetadataPath());
+        new ExternalTable("name", "hudi", "s3://bucket//path", null, null, 
null);
+    assertEquals("s3://bucket/path", tooManySlashes.getBasePath());
 
     ExternalTable localFilePath =
-        new ExternalTable("name", "hudi", "/local/data//path", null, null);
-    assertEquals("file:///local/data/path", localFilePath.getMetadataPath());
+        new ExternalTable("name", "hudi", "/local/data//path", null, null, 
null);
+    assertEquals("file:///local/data/path", localFilePath.getBasePath());
 
     ExternalTable properLocalFilePath =
-        new ExternalTable("name", "hudi", "file:///local/data//path", null, 
null);
-    assertEquals("file:///local/data/path", 
properLocalFilePath.getMetadataPath());
+        new ExternalTable("name", "hudi", "file:///local/data//path", null, 
null, null);
+    assertEquals("file:///local/data/path", properLocalFilePath.getBasePath());
   }
 
   @Test
   void errorIfRequiredArgsNotSet() {
     assertThrows(
-        NullPointerException.class, () -> new ExternalTable("name", "hudi", 
null, null, null));
+        NullPointerException.class, () -> new ExternalTable("name", "hudi", 
null, null, null, null));
 
     assertThrows(
         NullPointerException.class,
-        () -> new ExternalTable("name", null, "file://bucket/path", null, 
null));
+        () -> new ExternalTable("name", null, "file://bucket/path", null, 
null, null));
 
     assertThrows(
         NullPointerException.class,
-        () -> new ExternalTable(null, "hudi", "file://bucket/path", null, 
null));
+        () -> new ExternalTable(null, "hudi", "file://bucket/path", null, 
null, null));
   }
 }
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
index 42ea63da..05effaf9 100644
--- a/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
+++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
@@ -25,21 +25,21 @@ import org.junit.jupiter.api.Test;
 class TestSourceTable {
   @Test
   void dataPathDefaultsToMetadataPath() {
-    String metadataPath = "file:///path/to/table";
+    String basePath = "file:///path/to/table";
     SourceTable sourceTable =
-        
SourceTable.builder().name("name").formatName("hudi").metadataPath(metadataPath).build();
-    assertEquals(metadataPath, sourceTable.getDataPath());
+        
SourceTable.builder().name("name").formatName("hudi").basePath(basePath).build();
+    assertEquals(basePath, sourceTable.getDataPath());
   }
 
   @Test
   void dataPathIsSanitized() {
-    String metadataPath = "file:///path/to/table";
+    String basePath = "file:///path/to/table";
     String dataPath = "file:///path/to/table//data";
     SourceTable sourceTable =
         SourceTable.builder()
             .name("name")
             .formatName("hudi")
-            .metadataPath(metadataPath)
+            .basePath(basePath)
             .dataPath(dataPath)
             .build();
     assertEquals("file:///path/to/table/data", sourceTable.getDataPath());
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java 
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
index 2a40a8a1..9faa22c8 100644
--- a/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
+++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
@@ -27,9 +27,9 @@ import org.junit.jupiter.api.Test;
 class TestTargetTable {
   @Test
   void retentionDefaultsToSevenDays() {
-    String metadataPath = "file:///path/to/table";
+    String basePath = "file:///path/to/table";
     TargetTable targetTable =
-        
TargetTable.builder().name("name").formatName("hudi").metadataPath(metadataPath).build();
+        
TargetTable.builder().name("name").formatName("hudi").basePath(basePath).build();
     assertEquals(Duration.ofDays(7), targetTable.getMetadataRetention());
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index fdfdeb6e..17911134 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -87,8 +87,7 @@ public class ConversionController {
     }
 
     try (ConversionSource<COMMIT> conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(
-            config.getSourceTable(), config.getProperties())) {
+        
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
       ExtractFromSource<COMMIT> source = 
ExtractFromSource.of(conversionSource);
 
       Map<String, ConversionTarget> conversionTargetByFormat =
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
index e9146f50..ccd6fde7 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
@@ -18,8 +18,6 @@
  
 package org.apache.xtable.conversion;
 
-import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.xtable.spi.extractor.ConversionSource;
@@ -47,5 +45,5 @@ public abstract class ConversionSourceProvider<COMMIT> {
    * @return the conversion source
    */
   public abstract ConversionSource<COMMIT> getConversionSourceInstance(
-      SourceTable sourceTableConfig, Map<String, String> clientConf);
+      SourceTable sourceTableConfig);
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
index 53d3bf54..2ba79a88 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
@@ -30,14 +30,13 @@ import org.apache.xtable.conversion.SourceTable;
 /** A concrete implementation of {@link ConversionSourceProvider} for Delta 
Lake table format. */
 public class DeltaConversionSourceProvider extends 
ConversionSourceProvider<Long> {
   @Override
-  public DeltaConversionSource getConversionSourceInstance(
-      SourceTable sourceTable, Map<String, String> clientConf) {
+  public DeltaConversionSource getConversionSourceInstance(SourceTable 
sourceTable) {
     SparkSession sparkSession = 
DeltaConversionUtils.buildSparkSession(hadoopConf);
-    DeltaTable deltaTable = DeltaTable.forPath(sparkSession, 
sourceTable.getMetadataPath());
+    DeltaTable deltaTable = DeltaTable.forPath(sparkSession, 
sourceTable.getBasePath());
     return DeltaConversionSource.builder()
         .sparkSession(sparkSession)
         .tableName(sourceTable.getName())
-        .basePath(sourceTable.getMetadataPath())
+        .basePath(sourceTable.getBasePath())
         .deltaTable(deltaTable)
         .deltaLog(deltaTable.deltaLog())
         .build();
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index 6dc9a772..b34fa449 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -83,7 +83,7 @@ public class DeltaConversionTarget implements 
ConversionTarget {
 
   public DeltaConversionTarget(TargetTable targetTable, SparkSession 
sparkSession) {
     this(
-        targetTable.getMetadataPath(),
+        targetTable.getBasePath(),
         targetTable.getName(),
         targetTable.getMetadataRetention().toHours(),
         sparkSession,
@@ -138,7 +138,7 @@ public class DeltaConversionTarget implements 
ConversionTarget {
     SparkSession sparkSession = 
DeltaConversionUtils.buildSparkSession(configuration);
 
     _init(
-        targetTable.getMetadataPath(),
+        targetTable.getBasePath(),
         targetTable.getName(),
         targetTable.getMetadataRetention().toHours(),
         sparkSession,
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
index 773113ed..51c0afb9 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
@@ -18,8 +18,6 @@
  
 package org.apache.xtable.hudi;
 
-import java.util.Map;
-
 import lombok.extern.log4j.Log4j2;
 
 import org.apache.hudi.common.model.HoodieTableType;
@@ -35,11 +33,11 @@ public class HudiConversionSourceProvider extends 
ConversionSourceProvider<Hoodi
 
   @Override
   public HudiConversionSource getConversionSourceInstance(
-      SourceTable sourceTable, Map<String, String> clientConfigs) {
+      SourceTable sourceTable) {
     HoodieTableMetaClient metaClient =
         HoodieTableMetaClient.builder()
             .setConf(hadoopConf)
-            .setBasePath(sourceTable.getMetadataPath())
+            .setBasePath(sourceTable.getBasePath())
             .setLoadActiveTimelineOnLoad(true)
             .build();
     if 
(!metaClient.getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE))
 {
@@ -47,7 +45,7 @@ public class HudiConversionSourceProvider extends 
ConversionSourceProvider<Hoodi
     }
 
     final HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor =
-        
HudiSourceConfig.fromProperties(clientConfigs).loadSourcePartitionSpecExtractor();
+        
HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties()).loadSourcePartitionSpecExtractor();
 
     return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor);
   }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index e95391ad..c1852ea8 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -112,12 +112,12 @@ public class HudiConversionTarget implements 
ConversionTarget {
       Configuration configuration,
       int maxNumDeltaCommitsBeforeCompaction) {
     this(
-        targetTable.getMetadataPath(),
+        targetTable.getBasePath(),
         (int) targetTable.getMetadataRetention().toHours(),
         maxNumDeltaCommitsBeforeCompaction,
         BaseFileUpdatesExtractor.of(
             new HoodieJavaEngineContext(configuration),
-            new CachingPath(targetTable.getMetadataPath())),
+            new CachingPath(targetTable.getBasePath())),
         AvroSchemaConverter.getInstance(),
         HudiTableManager.of(configuration),
         CommitState::new);
@@ -165,12 +165,12 @@ public class HudiConversionTarget implements 
ConversionTarget {
   @Override
   public void init(TargetTable targetTable, Configuration configuration) {
     _init(
-        targetTable.getMetadataPath(),
+        targetTable.getBasePath(),
         (int) targetTable.getMetadataRetention().toHours(),
         HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(),
         BaseFileUpdatesExtractor.of(
             new HoodieJavaEngineContext(configuration),
-            new CachingPath(targetTable.getMetadataPath())),
+            new CachingPath(targetTable.getBasePath())),
         AvroSchemaConverter.getInstance(),
         HudiTableManager.of(configuration),
         CommitState::new);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
index 944cd221..88717d1e 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Properties;
 
 import lombok.Value;
 
@@ -48,12 +49,12 @@ public class HudiSourceConfig {
         parsePartitionFieldSpecs(partitionFieldSpecConfig));
   }
 
-  public static HudiSourceConfig fromProperties(Map<String, String> 
properties) {
+  public static HudiSourceConfig fromProperties(Properties properties) {
     String partitionSpecExtractorClass =
-        properties.getOrDefault(
+        properties.getProperty(
             PARTITION_SPEC_EXTRACTOR_CLASS,
             ConfigurationBasedPartitionSpecExtractor.class.getName());
-    String partitionFieldSpecString = 
properties.get(PARTITION_FIELD_SPEC_CONFIG);
+    String partitionFieldSpecString = 
properties.getProperty(PARTITION_FIELD_SPEC_CONFIG);
     List<PartitionFieldSpec> partitionFieldSpecs =
         parsePartitionFieldSpecs(partitionFieldSpecString);
     return new HudiSourceConfig(partitionSpecExtractorClass, 
partitionFieldSpecs);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 05434b6c..f96ec714 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -96,7 +96,7 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
     return tableManager.getTable(
         (IcebergCatalogConfig) sourceTableConfig.getCatalogConfig(),
         tableIdentifier,
-        sourceTableConfig.getMetadataPath());
+        sourceTableConfig.getBasePath());
   }
 
   private FileIO initTableOps() {
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
index 8873caab..449ebe5d 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
@@ -18,8 +18,6 @@
  
 package org.apache.xtable.iceberg;
 
-import java.util.Map;
-
 import org.apache.iceberg.Snapshot;
 
 import org.apache.xtable.conversion.ConversionSourceProvider;
@@ -28,8 +26,7 @@ import org.apache.xtable.conversion.SourceTable;
 /** A concrete implementation of {@link ConversionSourceProvider} for Hudi 
table format. */
 public class IcebergConversionSourceProvider extends 
ConversionSourceProvider<Snapshot> {
   @Override
-  public IcebergConversionSource getConversionSourceInstance(
-      SourceTable sourceTableConfig, Map<String, String> clientConf) {
+  public IcebergConversionSource getConversionSourceInstance(SourceTable 
sourceTableConfig) {
     return IcebergConversionSource.builder()
         .sourceTableConfig(sourceTableConfig)
         .hadoopConf(hadoopConf)
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index fe9ddb37..ecdbfa26 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -104,7 +104,7 @@ public class IcebergConversionTarget implements 
ConversionTarget {
     this.partitionSpecSync = partitionSpecSync;
     this.dataFileUpdatesExtractor = dataFileUpdatesExtractor;
     String tableName = targetTable.getName();
-    this.basePath = targetTable.getMetadataPath();
+    this.basePath = targetTable.getBasePath();
     this.configuration = configuration;
     this.snapshotRetentionInHours = (int) 
targetTable.getMetadataRetention().toHours();
     String[] namespace = targetTable.getNamespace();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 58561c05..f17ad1be 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -43,6 +43,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -852,12 +853,15 @@ public class ITConversionController {
       List<String> targetTableFormats,
       String partitionConfig,
       Duration metadataRetention) {
+    Properties sourceProperties = new Properties();
+    sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
     SourceTable sourceTable =
         SourceTable.builder()
             .name(tableName)
             .formatName(sourceTableFormat)
-            .metadataPath(table.getBasePath())
+            .basePath(table.getBasePath())
             .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
             .build();
 
     List<TargetTable> targetTables =
@@ -868,7 +872,7 @@ public class ITConversionController {
                         .name(tableName)
                         .formatName(formatName)
                         // set the metadata path to the data path as the 
default (required by Hudi)
-                        .metadataPath(table.getDataPath())
+                        .basePath(table.getDataPath())
                         .metadataRetention(metadataRetention)
                         .build())
             .collect(Collectors.toList());
@@ -877,7 +881,6 @@ public class ITConversionController {
         .sourceTable(sourceTable)
         .targetTables(targetTables)
         .syncMode(syncMode)
-        .properties(Collections.singletonMap(PARTITION_FIELD_SPEC_CONFIG, 
partitionConfig))
         .build();
   }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index d26accc1..6c5003e4 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -82,7 +82,7 @@ public class TestConversionController {
     TableSyncConfig tableSyncConfig =
         getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
     when(mockConversionSourceProvider.getConversionSourceInstance(
-            tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties()))
+            tableSyncConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
     when(mockConversionTargetFactory.createForFormat(
             tableSyncConfig.getTargetTables().get(0), mockConf))
@@ -108,7 +108,7 @@ public class TestConversionController {
     TableSyncConfig tableSyncConfig =
         getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
     when(mockConversionSourceProvider.getConversionSourceInstance(
-            tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties()))
+            tableSyncConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
     when(mockConversionTargetFactory.createForFormat(
             tableSyncConfig.getTargetTables().get(0), mockConf))
@@ -201,7 +201,7 @@ public class TestConversionController {
     TableSyncConfig tableSyncConfig =
         getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
     when(mockConversionSourceProvider.getConversionSourceInstance(
-            tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties()))
+            tableSyncConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
     when(mockConversionTargetFactory.createForFormat(
             tableSyncConfig.getTargetTables().get(0), mockConf))
@@ -238,7 +238,7 @@ public class TestConversionController {
     TableSyncConfig tableSyncConfig =
         getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
     when(mockConversionSourceProvider.getConversionSourceInstance(
-            tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties()))
+            tableSyncConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
     when(mockConversionTargetFactory.createForFormat(
             tableSyncConfig.getTargetTables().get(0), mockConf))
@@ -322,7 +322,7 @@ public class TestConversionController {
     TableSyncConfig tableSyncConfig =
         getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, 
TableFormat.DELTA), syncMode);
     when(mockConversionSourceProvider.getConversionSourceInstance(
-            tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties()))
+            tableSyncConfig.getSourceTable()))
         .thenReturn(mockConversionSource);
     when(mockConversionTargetFactory.createForFormat(
             tableSyncConfig.getTargetTables().get(0), mockConf))
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java
index 66698c06..6c927afe 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java
@@ -38,7 +38,6 @@ class TestTableSyncConfig {
             .targetTables(Collections.singletonList(mock(TargetTable.class)))
             .build();
 
-    assertEquals(Collections.emptyMap(), tableSyncConfig.getProperties());
     assertEquals(SyncMode.INCREMENTAL, tableSyncConfig.getSyncMode());
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
index a1e0d675..21152787 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
@@ -167,11 +167,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(tableName)
-            .metadataPath(basePath.toString())
+            .basePath(basePath.toString())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     // Get current snapshot
     InternalSnapshot snapshot = conversionSource.getCurrentSnapshot();
     // Validate table
@@ -225,11 +225,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(tableName)
-            .metadataPath(basePath.toString())
+            .basePath(basePath.toString())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     // Get current snapshot
     InternalSnapshot snapshot = conversionSource.getCurrentSnapshot();
     // Validate table
@@ -313,11 +313,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(tableName)
-            .metadataPath(basePath.toString())
+            .basePath(basePath.toString())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     // Get current snapshot
     InternalSnapshot snapshot = conversionSource.getCurrentSnapshot();
   }
@@ -352,11 +352,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(testSparkDeltaTable.getTableName())
-            .metadataPath(testSparkDeltaTable.getBasePath())
+            .basePath(testSparkDeltaTable.getBasePath())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     assertEquals(180L, testSparkDeltaTable.getNumRows());
     InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
 
@@ -392,11 +392,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(testSparkDeltaTable.getTableName())
-            .metadataPath(testSparkDeltaTable.getBasePath())
+            .basePath(testSparkDeltaTable.getBasePath())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     InternalSnapshot snapshotAfterCommit1 = 
conversionSource.getCurrentSnapshot();
     List<String> allActivePaths = 
ValidationTestHelper.getAllFilePaths(snapshotAfterCommit1);
     assertEquals(1, allActivePaths.size());
@@ -416,7 +416,7 @@ public class ITDeltaConversionTargetSource {
             .lastSyncInstant(Instant.ofEpochMilli(timestamp1))
             .build();
     conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     CommitsBacklog<Long> instantCurrentCommitState =
         conversionSource.getCommitsBacklog(instantsForIncrementalSync);
     boolean areFilesRemoved = false;
@@ -461,11 +461,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(testSparkDeltaTable.getTableName())
-            .metadataPath(testSparkDeltaTable.getBasePath())
+            .basePath(testSparkDeltaTable.getBasePath())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     assertEquals(130L, testSparkDeltaTable.getNumRows());
     InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
     if (isPartitioned) {
@@ -509,11 +509,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(testSparkDeltaTable.getTableName())
-            .metadataPath(testSparkDeltaTable.getBasePath())
+            .basePath(testSparkDeltaTable.getBasePath())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     assertEquals(150L, testSparkDeltaTable.getNumRows());
     InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
     if (isPartitioned) {
@@ -566,11 +566,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(testSparkDeltaTable.getTableName())
-            .metadataPath(testSparkDeltaTable.getBasePath())
+            .basePath(testSparkDeltaTable.getBasePath())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     assertEquals(
         120 - rowsByPartition.get(partitionValueToDelete).size(), 
testSparkDeltaTable.getNumRows());
     InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
@@ -626,11 +626,11 @@ public class ITDeltaConversionTargetSource {
     SourceTable tableConfig =
         SourceTable.builder()
             .name(testSparkDeltaTable.getTableName())
-            .metadataPath(testSparkDeltaTable.getBasePath())
+            .basePath(testSparkDeltaTable.getBasePath())
             .formatName(TableFormat.DELTA)
             .build();
     DeltaConversionSource conversionSource =
-        conversionSourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
     assertEquals(250L, testSparkDeltaTable.getNumRows());
     InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
     if (isPartitioned) {
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index 04132c84..f0f889d2 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -130,7 +130,7 @@ public class TestDeltaSync {
         new DeltaConversionTarget(
             TargetTable.builder()
                 .name(tableName)
-                .metadataPath(basePath.toString())
+                .basePath(basePath.toString())
                 .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
                 .formatName(TableFormat.DELTA)
                 .build(),
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
index 5588def1..12885567 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
@@ -584,7 +584,7 @@ public class ITHudiConversionSourceTarget {
   private HudiConversionTarget getTargetClient() {
     return new HudiConversionTarget(
         TargetTable.builder()
-            .metadataPath(tableBasePath)
+            .basePath(tableBasePath)
             .formatName(TableFormat.HUDI)
             .name("test_table")
             .metadataRetention(Duration.of(4, ChronoUnit.HOURS))
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
index 5c3e1628..3f20ac9d 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
@@ -29,7 +29,6 @@ import java.nio.file.Path;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -99,11 +98,11 @@ public class ITIcebergConversionTargetSource {
       SourceTable tableConfig =
           SourceTable.builder()
               .name(testIcebergTable.getTableName())
-              .metadataPath(testIcebergTable.getBasePath())
+              .basePath(testIcebergTable.getBasePath())
               .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
-          sourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+          sourceProvider.getConversionSourceInstance(tableConfig);
       assertEquals(180L, testIcebergTable.getNumRows());
       InternalSnapshot internalSnapshot = 
conversionSource.getCurrentSnapshot();
 
@@ -159,11 +158,11 @@ public class ITIcebergConversionTargetSource {
       SourceTable tableConfig =
           SourceTable.builder()
               .name(testIcebergTable.getTableName())
-              .metadataPath(testIcebergTable.getBasePath())
+              .basePath(testIcebergTable.getBasePath())
               .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
-          sourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+          sourceProvider.getConversionSourceInstance(tableConfig);
       assertEquals(
           120 - recordsByPartition.get(partitionValueToDelete).size(),
           testIcebergTable.getNumRows());
@@ -219,11 +218,11 @@ public class ITIcebergConversionTargetSource {
       SourceTable tableConfig =
           SourceTable.builder()
               .name(testIcebergTable.getTableName())
-              .metadataPath(testIcebergTable.getBasePath())
+              .basePath(testIcebergTable.getBasePath())
               .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
-          sourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+          sourceProvider.getConversionSourceInstance(tableConfig);
       assertEquals(
           120 - recordsByPartition.get(partitionValueToDelete).size(),
           testIcebergTable.getNumRows());
@@ -279,11 +278,11 @@ public class ITIcebergConversionTargetSource {
       SourceTable tableConfig =
           SourceTable.builder()
               .name(testIcebergTable.getTableName())
-              .metadataPath(testIcebergTable.getBasePath())
+              .basePath(testIcebergTable.getBasePath())
               .formatName(TableFormat.ICEBERG)
               .build();
       IcebergConversionSource conversionSource =
-          sourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+          sourceProvider.getConversionSourceInstance(tableConfig);
       assertEquals(200L, testIcebergTable.getNumRows());
       InternalSnapshot internalSnapshot = 
conversionSource.getCurrentSnapshot();
 
@@ -320,7 +319,7 @@ public class ITIcebergConversionTargetSource {
       SourceTable tableConfig =
           SourceTable.builder()
               .name(testIcebergTable.getTableName())
-              .metadataPath(testIcebergTable.getBasePath())
+              .basePath(testIcebergTable.getBasePath())
               .formatName(TableFormat.ICEBERG)
               .build();
 
@@ -336,7 +335,7 @@ public class ITIcebergConversionTargetSource {
         testIcebergTable.expireSnapshot(snapshotIdAfterCommit2);
       }
       IcebergConversionSource conversionSource =
-          sourceProvider.getConversionSourceInstance(tableConfig, 
Collections.emptyMap());
+          sourceProvider.getConversionSourceInstance(tableConfig);
       if (shouldExpireSnapshots) {
         
assertFalse(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
       } else {
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
index 021d5071..9dd4ffc7 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
@@ -26,7 +26,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Instant;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -93,7 +92,7 @@ class TestIcebergConversionTargetSource {
     SourceTable sourceTableConfig = getPerTableConfig(catalogSales);
 
     IcebergConversionSource conversionSource =
-        sourceProvider.getConversionSourceInstance(sourceTableConfig, 
Collections.emptyMap());
+        sourceProvider.getConversionSourceInstance(sourceTableConfig);
 
     Snapshot snapshot = catalogSales.currentSnapshot();
     InternalTable internalTable = conversionSource.getTable(snapshot);
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index d82c15ac..bd36dde9 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -192,7 +192,7 @@ public class TestIcebergSync {
   private IcebergConversionTarget getConversionTarget() {
     return new IcebergConversionTarget(
         TargetTable.builder()
-            .metadataPath(basePath.toString())
+            .basePath(basePath.toString())
             .name(tableName)
             .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
             .formatName(TableFormat.ICEBERG)
diff --git a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java 
b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
index 2361d30b..df135db7 100644
--- a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
+++ b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
@@ -24,6 +24,7 @@ import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -135,12 +136,15 @@ public class LoadTest {
 
   private static TableSyncConfig getTableSyncConfig(
       SyncMode syncMode, String tableName, GenericTable table, List<String> 
targetTableFormats) {
+    Properties sourceProperties = new Properties();
+    sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, "level:VALUE");
     SourceTable sourceTable =
         SourceTable.builder()
             .name(tableName)
             .formatName(TableFormat.HUDI)
-            .metadataPath(table.getBasePath())
+            .basePath(table.getBasePath())
             .dataPath(table.getDataPath())
+            .additionalProperties(sourceProperties)
             .build();
 
     List<TargetTable> targetTables =
@@ -150,7 +154,7 @@ public class LoadTest {
                     TargetTable.builder()
                         .name(tableName)
                         .formatName(formatName)
-                        .metadataPath(table.getBasePath())
+                        .basePath(table.getBasePath())
                         .build())
             .collect(Collectors.toList());
 
@@ -158,7 +162,6 @@ public class LoadTest {
         .sourceTable(sourceTable)
         .targetTables(targetTables)
         .syncMode(syncMode)
-        .properties(Collections.singletonMap(PARTITION_FIELD_SPEC_CONFIG, 
"level:VALUE"))
         .build();
   }
 }
diff --git 
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java 
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index ffa617e0..16821a96 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -26,6 +26,7 @@ import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
 import lombok.Data;
@@ -151,6 +152,10 @@ public class RunSync {
           "Running sync for basePath {} for following table formats {}",
           table.getTableBasePath(),
           tableFormatList);
+      Properties sourceProperties = new Properties();
+      if (table.getPartitionSpec() != null) {
+        sourceProperties.put(HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, 
table.getPartitionSpec());
+      }
       SourceTable sourceTable =
           SourceTable.builder()
               .name(table.getTableName())
@@ -158,6 +163,7 @@ public class RunSync {
               .namespace(table.getNamespace() == null ? null : 
table.getNamespace().split("\\."))
               .dataPath(table.getTableDataPath())
               .catalogConfig(icebergCatalogConfig)
+              .additionalProperties(sourceProperties)
               .formatName(sourceFormat)
               .build();
       List<TargetTable> targetTables =
@@ -182,9 +188,6 @@ public class RunSync {
               .sourceTable(sourceTable)
               .targetTables(targetTables)
               .syncMode(SyncMode.INCREMENTAL)
-              .properties(
-                  Collections.singletonMap(
-                      HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, 
table.getPartitionSpec()))
               .build();
       try {
         conversionController.sync(tableSyncConfig, conversionSourceProvider);

Reply via email to