This is an automated email from the ASF dual-hosted git repository. timbrown pushed a commit to branch 297-properties-based-config-2 in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit 08c31514de18643bf0b1908acef889d276dc27a7 Author: Timothy Brown <t...@onehouse.ai> AuthorDate: Fri Aug 2 14:34:36 2024 -0400 PR feedback --- .../apache/xtable/conversion/ExternalTable.java | 41 ++++++++++++++++++---- .../org/apache/xtable/conversion/SourceTable.java | 16 ++++++--- .../apache/xtable/conversion/TableSyncConfig.java | 6 +--- .../org/apache/xtable/conversion/TargetTable.java | 10 +++--- .../xtable/conversion/TestExternalTable.java | 18 +++++----- .../apache/xtable/conversion/TestSourceTable.java | 10 +++--- .../apache/xtable/conversion/TestTargetTable.java | 4 +-- .../xtable/conversion/ConversionController.java | 3 +- .../conversion/ConversionSourceProvider.java | 4 +-- .../delta/DeltaConversionSourceProvider.java | 7 ++-- .../apache/xtable/delta/DeltaConversionTarget.java | 4 +-- .../xtable/hudi/HudiConversionSourceProvider.java | 8 ++--- .../apache/xtable/hudi/HudiConversionTarget.java | 8 ++--- .../org/apache/xtable/hudi/HudiSourceConfig.java | 7 ++-- .../xtable/iceberg/IcebergConversionSource.java | 2 +- .../iceberg/IcebergConversionSourceProvider.java | 5 +-- .../xtable/iceberg/IcebergConversionTarget.java | 2 +- .../org/apache/xtable/ITConversionController.java | 9 +++-- .../conversion/TestConversionController.java | 10 +++--- .../xtable/conversion/TestTableSyncConfig.java | 1 - .../delta/ITDeltaConversionTargetSource.java | 38 ++++++++++---------- .../org/apache/xtable/delta/TestDeltaSync.java | 2 +- .../xtable/hudi/ITHudiConversionSourceTarget.java | 2 +- .../iceberg/ITIcebergConversionTargetSource.java | 21 ++++++----- .../iceberg/TestIcebergConversionTargetSource.java | 3 +- .../org/apache/xtable/iceberg/TestIcebergSync.java | 2 +- .../java/org/apache/xtable/loadtest/LoadTest.java | 9 +++-- .../java/org/apache/xtable/utilities/RunSync.java | 9 +++-- 28 files changed, 145 insertions(+), 116 deletions(-) diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java index eeb690d2..f6580c4b 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java @@ -26,26 +26,53 @@ import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; +import java.util.Properties; + +/** + * Defines a reference to a table in a particular format. + */ @Getter @EqualsAndHashCode class ExternalTable { - @NonNull String name; - @NonNull String formatName; - @NonNull String metadataPath; - String[] namespace; - CatalogConfig catalogConfig; + /** + * The name of the table. + */ + protected final @NonNull String name; + /** + * The format of the table (e.g. DELTA, ICEBERG, HUDI) + */ + protected final @NonNull String formatName; + /** + * The path to the root of the table or the metadata directory depending on the format + */ + protected final @NonNull String basePath; + /** + * Optional namespace for the table + */ + protected final String[] namespace; + /** + * The configuration for interacting with the catalog that manages this table + */ + protected final CatalogConfig catalogConfig; + + /** + * Optional, additional properties that can be used to define interactions with the table + */ + protected final Properties additionalProperties; ExternalTable( @NonNull String name, @NonNull String formatName, @NonNull String basePath, String[] namespace, - CatalogConfig catalogConfig) { + CatalogConfig catalogConfig, + Properties additionalProperties) { this.name = name; this.formatName = formatName; - this.metadataPath = sanitizeBasePath(basePath); + this.basePath = sanitizeBasePath(basePath); this.namespace = namespace; this.catalogConfig = catalogConfig; + this.additionalProperties = additionalProperties; } protected String sanitizeBasePath(String tableBasePath) { diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java index b219cf12..37224476 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java @@ -23,20 +23,26 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NonNull; +import java.util.Properties; + @EqualsAndHashCode(callSuper = true) @Getter public class SourceTable extends ExternalTable { - @NonNull String dataPath; + /** + * The path to the data files, defaults to the metadataPath + */ + @NonNull private final String dataPath; @Builder(toBuilder = true) public SourceTable( String name, String formatName, - String metadataPath, + String basePath, String dataPath, String[] namespace, - CatalogConfig catalogConfig) { - super(name, formatName, metadataPath, namespace, catalogConfig); - this.dataPath = dataPath == null ? this.metadataPath : sanitizeBasePath(dataPath); + CatalogConfig catalogConfig, + Properties additionalProperties) { + super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); + this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath); } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java index a56ec300..81ba99c3 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TableSyncConfig.java @@ -38,21 +38,17 @@ public class TableSyncConfig { List<TargetTable> targetTables; // The mode, incremental or snapshot SyncMode syncMode; - // Additional properties to be used when initializing the conversion source - Map<String, String> properties; @Builder TableSyncConfig( @NonNull SourceTable sourceTable, List<TargetTable> targetTables, - SyncMode syncMode, - Map<String, String> properties) { + SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; - this.properties = properties == null ? Collections.emptyMap() : properties; } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java index 067eb8a9..6256da2c 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -20,6 +20,7 @@ package org.apache.xtable.conversion; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.Properties; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -28,17 +29,18 @@ import lombok.Getter; @Getter @EqualsAndHashCode(callSuper = true) public class TargetTable extends ExternalTable { - Duration metadataRetention; + private final Duration metadataRetention; @Builder(toBuilder = true) public TargetTable( String name, String formatName, - String metadataPath, + String basePath, String[] namespace, CatalogConfig catalogConfig, - Duration metadataRetention) { - super(name, formatName, metadataPath, namespace, catalogConfig); + Duration metadataRetention, + Properties additionalProperties) { + super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); this.metadataRetention = metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; } diff --git a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java index 6345baf0..6bc42da4 100644 --- a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java +++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java @@ -27,29 +27,29 @@ public class TestExternalTable { @Test void sanitizePath() { ExternalTable tooManySlashes = - new ExternalTable("name", "hudi", "s3://bucket//path", null, null); - assertEquals("s3://bucket/path", tooManySlashes.getMetadataPath()); + new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null); + assertEquals("s3://bucket/path", tooManySlashes.getBasePath()); ExternalTable localFilePath = - new ExternalTable("name", "hudi", "/local/data//path", null, null); - assertEquals("file:///local/data/path", localFilePath.getMetadataPath()); + new ExternalTable("name", "hudi", "/local/data//path", null, null, null); + assertEquals("file:///local/data/path", localFilePath.getBasePath()); ExternalTable properLocalFilePath = - new ExternalTable("name", "hudi", "file:///local/data//path", null, null); - assertEquals("file:///local/data/path", properLocalFilePath.getMetadataPath()); + new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null); + assertEquals("file:///local/data/path", properLocalFilePath.getBasePath()); } @Test void errorIfRequiredArgsNotSet() { assertThrows( - NullPointerException.class, () -> new ExternalTable("name", "hudi", null, null, null)); + NullPointerException.class, () -> new ExternalTable("name", "hudi", null, null, null, null)); assertThrows( NullPointerException.class, - () -> new ExternalTable("name", null, "file://bucket/path", null, null)); + () -> new ExternalTable("name", null, "file://bucket/path", null, null, null)); assertThrows( NullPointerException.class, - () -> new ExternalTable(null, "hudi", "file://bucket/path", null, null)); + () -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null)); } } diff --git a/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java index 42ea63da..05effaf9 100644 --- a/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java +++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java @@ -25,21 +25,21 @@ import org.junit.jupiter.api.Test; class TestSourceTable { @Test void dataPathDefaultsToMetadataPath() { - String metadataPath = "file:///path/to/table"; + String basePath = "file:///path/to/table"; SourceTable sourceTable = - SourceTable.builder().name("name").formatName("hudi").metadataPath(metadataPath).build(); - assertEquals(metadataPath, sourceTable.getDataPath()); + SourceTable.builder().name("name").formatName("hudi").basePath(basePath).build(); + assertEquals(basePath, sourceTable.getDataPath()); } @Test void dataPathIsSanitized() { - String metadataPath = "file:///path/to/table"; + String basePath = "file:///path/to/table"; String dataPath = "file:///path/to/table//data"; SourceTable sourceTable = SourceTable.builder() .name("name") .formatName("hudi") - .metadataPath(metadataPath) + .basePath(basePath) .dataPath(dataPath) .build(); assertEquals("file:///path/to/table/data", sourceTable.getDataPath()); diff --git a/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java index 2a40a8a1..9faa22c8 100644 --- a/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java +++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java @@ -27,9 +27,9 @@ import org.junit.jupiter.api.Test; class TestTargetTable { @Test void retentionDefaultsToSevenDays() { - String metadataPath = "file:///path/to/table"; + String basePath = "file:///path/to/table"; TargetTable targetTable = - TargetTable.builder().name("name").formatName("hudi").metadataPath(metadataPath).build(); + TargetTable.builder().name("name").formatName("hudi").basePath(basePath).build(); assertEquals(Duration.ofDays(7), targetTable.getMetadataRetention()); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index fdfdeb6e..17911134 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -87,8 +87,7 @@ public class ConversionController { } try (ConversionSource<COMMIT> conversionSource = - conversionSourceProvider.getConversionSourceInstance( - config.getSourceTable(), config.getProperties())) { + conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource<COMMIT> source = ExtractFromSource.of(conversionSource); Map<String, ConversionTarget> conversionTargetByFormat = diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java index e9146f50..ccd6fde7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java @@ -18,8 +18,6 @@ package org.apache.xtable.conversion; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.xtable.spi.extractor.ConversionSource; @@ -47,5 +45,5 @@ public abstract class ConversionSourceProvider<COMMIT> { * @return the conversion source */ public abstract ConversionSource<COMMIT> getConversionSourceInstance( - SourceTable sourceTableConfig, Map<String, String> clientConf); + SourceTable sourceTableConfig); } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java index 53d3bf54..2ba79a88 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java @@ -30,14 +30,13 @@ import org.apache.xtable.conversion.SourceTable; /** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */ public class DeltaConversionSourceProvider extends ConversionSourceProvider<Long> { @Override - public DeltaConversionSource getConversionSourceInstance( - SourceTable sourceTable, Map<String, String> clientConf) { + public DeltaConversionSource getConversionSourceInstance(SourceTable sourceTable) { SparkSession sparkSession = DeltaConversionUtils.buildSparkSession(hadoopConf); - DeltaTable deltaTable = DeltaTable.forPath(sparkSession, sourceTable.getMetadataPath()); + DeltaTable deltaTable = DeltaTable.forPath(sparkSession, sourceTable.getBasePath()); return DeltaConversionSource.builder() .sparkSession(sparkSession) .tableName(sourceTable.getName()) - .basePath(sourceTable.getMetadataPath()) + .basePath(sourceTable.getBasePath()) .deltaTable(deltaTable) .deltaLog(deltaTable.deltaLog()) .build(); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index 6dc9a772..b34fa449 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -83,7 +83,7 @@ public class DeltaConversionTarget implements ConversionTarget { public DeltaConversionTarget(TargetTable targetTable, SparkSession sparkSession) { this( - targetTable.getMetadataPath(), + targetTable.getBasePath(), targetTable.getName(), targetTable.getMetadataRetention().toHours(), sparkSession, @@ -138,7 +138,7 @@ public class DeltaConversionTarget implements ConversionTarget { SparkSession sparkSession = DeltaConversionUtils.buildSparkSession(configuration); _init( - targetTable.getMetadataPath(), + targetTable.getBasePath(), targetTable.getName(), targetTable.getMetadataRetention().toHours(), sparkSession, diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java index 773113ed..51c0afb9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java @@ -18,8 +18,6 @@ package org.apache.xtable.hudi; -import java.util.Map; - import lombok.extern.log4j.Log4j2; import org.apache.hudi.common.model.HoodieTableType; @@ -35,11 +33,11 @@ public class HudiConversionSourceProvider extends ConversionSourceProvider<Hoodi @Override public HudiConversionSource getConversionSourceInstance( - SourceTable sourceTable, Map<String, String> clientConfigs) { + SourceTable sourceTable) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(hadoopConf) - .setBasePath(sourceTable.getMetadataPath()) + .setBasePath(sourceTable.getBasePath()) .setLoadActiveTimelineOnLoad(true) .build(); if (!metaClient.getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE)) { @@ -47,7 +45,7 @@ public class HudiConversionSourceProvider extends ConversionSourceProvider<Hoodi } final HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor = - HudiSourceConfig.fromProperties(clientConfigs).loadSourcePartitionSpecExtractor(); + HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties()).loadSourcePartitionSpecExtractor(); return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java index e95391ad..c1852ea8 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java @@ -112,12 +112,12 @@ public class HudiConversionTarget implements ConversionTarget { Configuration configuration, int maxNumDeltaCommitsBeforeCompaction) { this( - targetTable.getMetadataPath(), + targetTable.getBasePath(), (int) targetTable.getMetadataRetention().toHours(), maxNumDeltaCommitsBeforeCompaction, BaseFileUpdatesExtractor.of( new HoodieJavaEngineContext(configuration), - new CachingPath(targetTable.getMetadataPath())), + new CachingPath(targetTable.getBasePath())), AvroSchemaConverter.getInstance(), HudiTableManager.of(configuration), CommitState::new); @@ -165,12 +165,12 @@ public class HudiConversionTarget implements ConversionTarget { @Override public void init(TargetTable targetTable, Configuration configuration) { _init( - targetTable.getMetadataPath(), + targetTable.getBasePath(), (int) targetTable.getMetadataRetention().toHours(), HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(), BaseFileUpdatesExtractor.of( new HoodieJavaEngineContext(configuration), - new CachingPath(targetTable.getMetadataPath())), + new CachingPath(targetTable.getBasePath())), AvroSchemaConverter.getInstance(), HudiTableManager.of(configuration), CommitState::new); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index 944cd221..88717d1e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Properties; import lombok.Value; @@ -48,12 +49,12 @@ public class HudiSourceConfig { parsePartitionFieldSpecs(partitionFieldSpecConfig)); } - public static HudiSourceConfig fromProperties(Map<String, String> properties) { + public static HudiSourceConfig fromProperties(Properties properties) { String partitionSpecExtractorClass = - properties.getOrDefault( + properties.getProperty( PARTITION_SPEC_EXTRACTOR_CLASS, ConfigurationBasedPartitionSpecExtractor.class.getName()); - String partitionFieldSpecString = properties.get(PARTITION_FIELD_SPEC_CONFIG); + String partitionFieldSpecString = properties.getProperty(PARTITION_FIELD_SPEC_CONFIG); List<PartitionFieldSpec> partitionFieldSpecs = parsePartitionFieldSpecs(partitionFieldSpecString); return new HudiSourceConfig(partitionSpecExtractorClass, partitionFieldSpecs); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 05434b6c..f96ec714 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -96,7 +96,7 @@ public class IcebergConversionSource implements ConversionSource<Snapshot> { return tableManager.getTable( (IcebergCatalogConfig) sourceTableConfig.getCatalogConfig(), tableIdentifier, - sourceTableConfig.getMetadataPath()); + sourceTableConfig.getBasePath()); } private FileIO initTableOps() { diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java index 8873caab..449ebe5d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java @@ -18,8 +18,6 @@ package org.apache.xtable.iceberg; -import java.util.Map; - import org.apache.iceberg.Snapshot; import org.apache.xtable.conversion.ConversionSourceProvider; @@ -28,8 +26,7 @@ import org.apache.xtable.conversion.SourceTable; /** A concrete implementation of {@link ConversionSourceProvider} for Hudi table format. */ public class IcebergConversionSourceProvider extends ConversionSourceProvider<Snapshot> { @Override - public IcebergConversionSource getConversionSourceInstance( - SourceTable sourceTableConfig, Map<String, String> clientConf) { + public IcebergConversionSource getConversionSourceInstance(SourceTable sourceTableConfig) { return IcebergConversionSource.builder() .sourceTableConfig(sourceTableConfig) .hadoopConf(hadoopConf) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index fe9ddb37..ecdbfa26 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -104,7 +104,7 @@ public class IcebergConversionTarget implements ConversionTarget { this.partitionSpecSync = partitionSpecSync; this.dataFileUpdatesExtractor = dataFileUpdatesExtractor; String tableName = targetTable.getName(); - this.basePath = targetTable.getMetadataPath(); + this.basePath = targetTable.getBasePath(); this.configuration = configuration; this.snapshotRetentionInHours = (int) targetTable.getMetadataRetention().toHours(); String[] namespace = targetTable.getNamespace(); diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 58561c05..f17ad1be 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -852,12 +853,15 @@ public class ITConversionController { List<String> targetTableFormats, String partitionConfig, Duration metadataRetention) { + Properties sourceProperties = new Properties(); + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); SourceTable sourceTable = SourceTable.builder() .name(tableName) .formatName(sourceTableFormat) - .metadataPath(table.getBasePath()) + .basePath(table.getBasePath()) .dataPath(table.getDataPath()) + .additionalProperties(sourceProperties) .build(); List<TargetTable> targetTables = @@ -868,7 +872,7 @@ public class ITConversionController { .name(tableName) .formatName(formatName) // set the metadata path to the data path as the default (required by Hudi) - .metadataPath(table.getDataPath()) + .basePath(table.getDataPath()) .metadataRetention(metadataRetention) .build()) .collect(Collectors.toList()); @@ -877,7 +881,6 @@ public class ITConversionController { .sourceTable(sourceTable) .targetTables(targetTables) .syncMode(syncMode) - .properties(Collections.singletonMap(PARTITION_FIELD_SPEC_CONFIG, partitionConfig)) .build(); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index d26accc1..6c5003e4 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -82,7 +82,7 @@ public class TestConversionController { TableSyncConfig tableSyncConfig = getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); when(mockConversionSourceProvider.getConversionSourceInstance( - tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties())) + tableSyncConfig.getSourceTable())) .thenReturn(mockConversionSource); when(mockConversionTargetFactory.createForFormat( tableSyncConfig.getTargetTables().get(0), mockConf)) @@ -108,7 +108,7 @@ public class TestConversionController { TableSyncConfig tableSyncConfig = getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); when(mockConversionSourceProvider.getConversionSourceInstance( - tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties())) + tableSyncConfig.getSourceTable())) .thenReturn(mockConversionSource); when(mockConversionTargetFactory.createForFormat( tableSyncConfig.getTargetTables().get(0), mockConf)) @@ -201,7 +201,7 @@ public class TestConversionController { TableSyncConfig tableSyncConfig = getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); when(mockConversionSourceProvider.getConversionSourceInstance( - tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties())) + tableSyncConfig.getSourceTable())) .thenReturn(mockConversionSource); when(mockConversionTargetFactory.createForFormat( tableSyncConfig.getTargetTables().get(0), mockConf)) @@ -238,7 +238,7 @@ public class TestConversionController { TableSyncConfig tableSyncConfig = getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); when(mockConversionSourceProvider.getConversionSourceInstance( - tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties())) + tableSyncConfig.getSourceTable())) .thenReturn(mockConversionSource); when(mockConversionTargetFactory.createForFormat( tableSyncConfig.getTargetTables().get(0), mockConf)) @@ -322,7 +322,7 @@ public class TestConversionController { TableSyncConfig tableSyncConfig = getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); when(mockConversionSourceProvider.getConversionSourceInstance( - tableSyncConfig.getSourceTable(), tableSyncConfig.getProperties())) + tableSyncConfig.getSourceTable())) .thenReturn(mockConversionSource); when(mockConversionTargetFactory.createForFormat( tableSyncConfig.getTargetTables().get(0), mockConf)) diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java index 66698c06..6c927afe 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestTableSyncConfig.java @@ -38,7 +38,6 @@ class TestTableSyncConfig { .targetTables(Collections.singletonList(mock(TargetTable.class))) .build(); - assertEquals(Collections.emptyMap(), tableSyncConfig.getProperties()); assertEquals(SyncMode.INCREMENTAL, tableSyncConfig.getSyncMode()); } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java index a1e0d675..21152787 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java @@ -167,11 +167,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(tableName) - .metadataPath(basePath.toString()) + .basePath(basePath.toString()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); // Get current snapshot InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); // Validate table @@ -225,11 +225,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(tableName) - .metadataPath(basePath.toString()) + .basePath(basePath.toString()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); // Get current snapshot InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); // Validate table @@ -313,11 +313,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(tableName) - .metadataPath(basePath.toString()) + .basePath(basePath.toString()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); // Get current snapshot InternalSnapshot snapshot = conversionSource.getCurrentSnapshot(); } @@ -352,11 +352,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .metadataPath(testSparkDeltaTable.getBasePath()) + .basePath(testSparkDeltaTable.getBasePath()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); assertEquals(180L, testSparkDeltaTable.getNumRows()); InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); @@ -392,11 +392,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .metadataPath(testSparkDeltaTable.getBasePath()) + .basePath(testSparkDeltaTable.getBasePath()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); InternalSnapshot snapshotAfterCommit1 = conversionSource.getCurrentSnapshot(); List<String> allActivePaths = ValidationTestHelper.getAllFilePaths(snapshotAfterCommit1); assertEquals(1, allActivePaths.size()); @@ -416,7 +416,7 @@ public class ITDeltaConversionTargetSource { .lastSyncInstant(Instant.ofEpochMilli(timestamp1)) .build(); conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); CommitsBacklog<Long> instantCurrentCommitState = conversionSource.getCommitsBacklog(instantsForIncrementalSync); boolean areFilesRemoved = false; @@ -461,11 +461,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .metadataPath(testSparkDeltaTable.getBasePath()) + .basePath(testSparkDeltaTable.getBasePath()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); assertEquals(130L, testSparkDeltaTable.getNumRows()); InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); if (isPartitioned) { @@ -509,11 +509,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .metadataPath(testSparkDeltaTable.getBasePath()) + .basePath(testSparkDeltaTable.getBasePath()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); assertEquals(150L, testSparkDeltaTable.getNumRows()); InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); if (isPartitioned) { @@ -566,11 +566,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .metadataPath(testSparkDeltaTable.getBasePath()) + .basePath(testSparkDeltaTable.getBasePath()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); assertEquals( 120 - rowsByPartition.get(partitionValueToDelete).size(), testSparkDeltaTable.getNumRows()); InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); @@ -626,11 +626,11 @@ public class ITDeltaConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testSparkDeltaTable.getTableName()) - .metadataPath(testSparkDeltaTable.getBasePath()) + .basePath(testSparkDeltaTable.getBasePath()) .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + conversionSourceProvider.getConversionSourceInstance(tableConfig); assertEquals(250L, testSparkDeltaTable.getNumRows()); InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); if (isPartitioned) { diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index 04132c84..f0f889d2 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -130,7 +130,7 @@ public class TestDeltaSync { new DeltaConversionTarget( TargetTable.builder() .name(tableName) - .metadataPath(basePath.toString()) + .basePath(basePath.toString()) .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) .formatName(TableFormat.DELTA) .build(), diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java index 5588def1..12885567 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java @@ -584,7 +584,7 @@ public class ITHudiConversionSourceTarget { private HudiConversionTarget getTargetClient() { return new HudiConversionTarget( TargetTable.builder() - .metadataPath(tableBasePath) + .basePath(tableBasePath) .formatName(TableFormat.HUDI) .name("test_table") .metadataRetention(Duration.of(4, ChronoUnit.HOURS)) diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java index 5c3e1628..3f20ac9d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java @@ -29,7 +29,6 @@ import java.nio.file.Path; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -99,11 +98,11 @@ public class ITIcebergConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testIcebergTable.getTableName()) - .metadataPath(testIcebergTable.getBasePath()) + .basePath(testIcebergTable.getBasePath()) .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = - sourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + sourceProvider.getConversionSourceInstance(tableConfig); assertEquals(180L, testIcebergTable.getNumRows()); InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); @@ -159,11 +158,11 @@ public class ITIcebergConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testIcebergTable.getTableName()) - .metadataPath(testIcebergTable.getBasePath()) + .basePath(testIcebergTable.getBasePath()) .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = - sourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + sourceProvider.getConversionSourceInstance(tableConfig); assertEquals( 120 - recordsByPartition.get(partitionValueToDelete).size(), testIcebergTable.getNumRows()); @@ -219,11 +218,11 @@ public class ITIcebergConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testIcebergTable.getTableName()) - .metadataPath(testIcebergTable.getBasePath()) + .basePath(testIcebergTable.getBasePath()) .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = - sourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + sourceProvider.getConversionSourceInstance(tableConfig); assertEquals( 120 - recordsByPartition.get(partitionValueToDelete).size(), testIcebergTable.getNumRows()); @@ -279,11 +278,11 @@ public class ITIcebergConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testIcebergTable.getTableName()) - .metadataPath(testIcebergTable.getBasePath()) + .basePath(testIcebergTable.getBasePath()) .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = - sourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + sourceProvider.getConversionSourceInstance(tableConfig); assertEquals(200L, testIcebergTable.getNumRows()); InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot(); @@ -320,7 +319,7 @@ public class ITIcebergConversionTargetSource { SourceTable tableConfig = SourceTable.builder() .name(testIcebergTable.getTableName()) - .metadataPath(testIcebergTable.getBasePath()) + .basePath(testIcebergTable.getBasePath()) .formatName(TableFormat.ICEBERG) .build(); @@ -336,7 +335,7 @@ public class ITIcebergConversionTargetSource { testIcebergTable.expireSnapshot(snapshotIdAfterCommit2); } IcebergConversionSource conversionSource = - sourceProvider.getConversionSourceInstance(tableConfig, Collections.emptyMap()); + sourceProvider.getConversionSourceInstance(tableConfig); if (shouldExpireSnapshots) { assertFalse(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1))); } else { diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java index 021d5071..9dd4ffc7 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java @@ -26,7 +26,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Instant; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -93,7 +92,7 @@ class TestIcebergConversionTargetSource { SourceTable sourceTableConfig = getPerTableConfig(catalogSales); IcebergConversionSource conversionSource = - sourceProvider.getConversionSourceInstance(sourceTableConfig, Collections.emptyMap()); + sourceProvider.getConversionSourceInstance(sourceTableConfig); Snapshot snapshot = catalogSales.currentSnapshot(); InternalTable internalTable = conversionSource.getTable(snapshot); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index d82c15ac..bd36dde9 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -192,7 +192,7 @@ public class TestIcebergSync { private IcebergConversionTarget getConversionTarget() { return new IcebergConversionTarget( TargetTable.builder() - .metadataPath(basePath.toString()) + .basePath(basePath.toString()) .name(tableName) .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) .formatName(TableFormat.ICEBERG) diff --git a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java index 2361d30b..df135db7 100644 --- a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java +++ b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java @@ -24,6 +24,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -135,12 +136,15 @@ public class LoadTest { private static TableSyncConfig getTableSyncConfig( SyncMode syncMode, String tableName, GenericTable table, List<String> targetTableFormats) { + Properties sourceProperties = new Properties(); + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, "level:VALUE"); SourceTable sourceTable = SourceTable.builder() .name(tableName) .formatName(TableFormat.HUDI) - .metadataPath(table.getBasePath()) + .basePath(table.getBasePath()) .dataPath(table.getDataPath()) + .additionalProperties(sourceProperties) .build(); List<TargetTable> targetTables = @@ -150,7 +154,7 @@ public class LoadTest { TargetTable.builder() .name(tableName) .formatName(formatName) - .metadataPath(table.getBasePath()) + .basePath(table.getBasePath()) .build()) .collect(Collectors.toList()); @@ -158,7 +162,6 @@ public class LoadTest { .sourceTable(sourceTable) .targetTables(targetTables) .syncMode(syncMode) - .properties(Collections.singletonMap(PARTITION_FIELD_SPEC_CONFIG, "level:VALUE")) .build(); } } diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index ffa617e0..16821a96 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -26,6 +26,7 @@ import java.nio.file.Paths; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import lombok.Data; @@ -151,6 +152,10 @@ public class RunSync { "Running sync for basePath {} for following table formats {}", table.getTableBasePath(), tableFormatList); + Properties sourceProperties = new Properties(); + if (table.getPartitionSpec() != null) { + sourceProperties.put(HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); + } SourceTable sourceTable = SourceTable.builder() .name(table.getTableName()) @@ -158,6 +163,7 @@ public class RunSync { .namespace(table.getNamespace() == null ? null : table.getNamespace().split("\\.")) .dataPath(table.getTableDataPath()) .catalogConfig(icebergCatalogConfig) + .additionalProperties(sourceProperties) .formatName(sourceFormat) .build(); List<TargetTable> targetTables = @@ -182,9 +188,6 @@ public class RunSync { .sourceTable(sourceTable) .targetTables(targetTables) .syncMode(SyncMode.INCREMENTAL) - .properties( - Collections.singletonMap( - HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec())) .build(); try { conversionController.sync(tableSyncConfig, conversionSourceProvider);