This is an automated email from the ASF dual-hosted git repository. timbrown pushed a commit to branch 77-schema-catalog in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit c6dd14bc73080d1f908ee5365a37c00a01f3b9a4 Author: Timothy Brown <[email protected]> AuthorDate: Wed Apr 24 13:52:11 2024 -0500 remove schema catalog and schema versions --- .../org/apache/xtable/model/InternalSnapshot.java | 3 - .../apache/xtable/model/schema/SchemaCatalog.java | 41 -------- .../apache/xtable/model/schema/SchemaVersion.java | 35 ------- .../xtable/model/storage/InternalDataFile.java | 3 - .../xtable/spi/extractor/ConversionSource.java | 10 -- .../spi/extractor/SchemaCatalogExtractor.java | 31 ------ .../spi/extractor/TestExtractFromSource.java | 8 +- .../apache/xtable/delta/DeltaConversionSource.java | 14 --- .../apache/xtable/hudi/HudiConversionSource.java | 7 -- .../apache/xtable/hudi/HudiDataFileExtractor.java | 3 - .../xtable/hudi/HudiSchemaCatalogExtractor.java | 47 --------- .../xtable/iceberg/IcebergConversionSource.java | 16 ---- .../delta/ITDeltaConversionTargetSource.java | 18 ---- .../java/org/apache/xtable/hudi/HudiTestUtil.java | 3 - .../xtable/hudi/ITHudiConversionSourceTarget.java | 3 - .../xtable/hudi/TestBaseFileUpdatesExtractor.java | 2 - .../xtable/hudi/TestHudiFileStatsExtractor.java | 3 - .../iceberg/TestIcebergConversionTargetSource.java | 31 ------ .../org/apache/xtable/iceberg/TestIcebergSync.java | 106 +++++++-------------- 19 files changed, 37 insertions(+), 347 deletions(-) diff --git a/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java b/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java index 4c234922..345fda78 100644 --- a/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java +++ b/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java @@ -25,7 +25,6 @@ import java.util.List; import lombok.Builder; import lombok.Value; -import org.apache.xtable.model.schema.SchemaCatalog; import org.apache.xtable.model.storage.PartitionFileGroup; /** @@ -44,8 +43,6 @@ public class InternalSnapshot { String version; // Table reference InternalTable table; - // Schema catalog referencing the written schema for each data file in the snapshot - SchemaCatalog schemaCatalog; // Data files grouped by partition List<PartitionFileGroup> partitionedDataFiles; // pending commits before latest commit on the table. diff --git a/api/src/main/java/org/apache/xtable/model/schema/SchemaCatalog.java b/api/src/main/java/org/apache/xtable/model/schema/SchemaCatalog.java deleted file mode 100644 index 64261425..00000000 --- a/api/src/main/java/org/apache/xtable/model/schema/SchemaCatalog.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.model.schema; - -import java.util.Map; - -import lombok.Builder; -import lombok.Getter; -import lombok.Value; - -/** - * Mapping of schema version to the actual schema. - * - * @since 0.1 - */ -@Getter -@Value -@Builder -public class SchemaCatalog { - Map<SchemaVersion, InternalSchema> schemas; - - public SchemaCatalog(Map<SchemaVersion, InternalSchema> schemas) { - this.schemas = schemas; - } -} diff --git a/api/src/main/java/org/apache/xtable/model/schema/SchemaVersion.java b/api/src/main/java/org/apache/xtable/model/schema/SchemaVersion.java deleted file mode 100644 index 9e6130b6..00000000 --- a/api/src/main/java/org/apache/xtable/model/schema/SchemaVersion.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.model.schema; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Value; - -/** - * Represents the schema version. - * - * @since 0.1 - */ -@Getter -@Value -public class SchemaVersion { - int version; - @EqualsAndHashCode.Exclude String evolutionComment; -} diff --git a/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java b/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java index 52effbe2..3aee766e 100644 --- a/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java +++ b/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java @@ -25,7 +25,6 @@ import lombok.Builder; import lombok.NonNull; import lombok.Value; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; @@ -37,8 +36,6 @@ import org.apache.xtable.model.stat.PartitionValue; @Builder(toBuilder = true) @Value public class InternalDataFile { - // written schema version - SchemaVersion schemaVersion; // physical path of the file (absolute with scheme) @NonNull String physicalPath; // file format diff --git a/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java b/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java index ed1486b9..2500454c 100644 --- a/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java +++ b/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java @@ -26,7 +26,6 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; -import org.apache.xtable.model.schema.SchemaCatalog; /** * A client that provides the major functionality for extracting the state at a given instant in a @@ -42,15 +41,6 @@ public interface ConversionSource<COMMIT> extends Closeable { */ InternalTable getTable(COMMIT commit); - /** - * Extracts the {@link SchemaCatalog} as of the provided instant. - * - * @param table the current state of the table for this commit - * @param commit the commit to consider for reading the schema catalog - * @return the schema catalog - */ - SchemaCatalog getSchemaCatalog(InternalTable table, COMMIT commit); - /** * Extracts the {@link InternalSnapshot} as of latest state. * diff --git a/api/src/main/java/org/apache/xtable/spi/extractor/SchemaCatalogExtractor.java b/api/src/main/java/org/apache/xtable/spi/extractor/SchemaCatalogExtractor.java deleted file mode 100644 index 12a078e5..00000000 --- a/api/src/main/java/org/apache/xtable/spi/extractor/SchemaCatalogExtractor.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.spi.extractor; - -import org.apache.xtable.model.schema.SchemaCatalog; - -/** - * Extracts and holds versioned schemas from {@link CLIENT} - * - * @param <CLIENT> table format client from which table metadata can be retrieved and Schema Catalog - * is built. Can use `HoodieTableMetaClient` for Hudi and `DeltaLog` for Delta. - */ -public interface SchemaCatalogExtractor<CLIENT> { - SchemaCatalog catalog(CLIENT client); -} diff --git a/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java b/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java index 82b61609..8503057d 100644 --- a/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java +++ b/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java @@ -41,7 +41,6 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; -import org.apache.xtable.model.schema.SchemaCatalog; import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.InternalDataFile; import org.apache.xtable.model.storage.PartitionFileGroup; @@ -52,14 +51,9 @@ public class TestExtractFromSource { @Test public void extractSnapshot() { InternalTable table = InternalTable.builder().latestCommitTime(Instant.now()).build(); - SchemaCatalog schemaCatalog = new SchemaCatalog(Collections.emptyMap()); List<PartitionFileGroup> dataFiles = Collections.emptyList(); InternalSnapshot internalSnapshot = - InternalSnapshot.builder() - .schemaCatalog(schemaCatalog) - .table(table) - .partitionedDataFiles(dataFiles) - .build(); + InternalSnapshot.builder().table(table).partitionedDataFiles(dataFiles).build(); when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot); assertEquals(internalSnapshot, ExtractFromSource.of(mockConversionSource).extractSnapshot()); } diff --git a/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 9a00e54c..19ecc02c 100644 --- a/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -21,10 +21,8 @@ package org.apache.xtable.delta; import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -51,8 +49,6 @@ import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.schema.SchemaCatalog; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; @@ -85,15 +81,6 @@ public class DeltaConversionSource implements ConversionSource<Long> { return tableExtractor.table(deltaLog, tableName, version); } - @Override - public SchemaCatalog getSchemaCatalog(InternalTable table, Long version) { - // TODO: Does not support schema versions for now - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion = new SchemaVersion(1, ""); - schemas.put(schemaVersion, table.getReadSchema()); - return SchemaCatalog.builder().schemas(schemas).build(); - } - @Override public InternalSnapshot getCurrentSnapshot() { DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath); @@ -101,7 +88,6 @@ public class DeltaConversionSource implements ConversionSource<Long> { InternalTable table = getTable(snapshot.version()); return InternalSnapshot.builder() .table(table) - .schemaCatalog(getSchemaCatalog(table, snapshot.version())) .partitionedDataFiles(getInternalDataFiles(snapshot, table.getReadSchema())) .build(); } diff --git a/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index bdb798f1..1b1d0bf3 100644 --- a/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -49,7 +49,6 @@ import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; -import org.apache.xtable.model.schema.SchemaCatalog; import org.apache.xtable.spi.extractor.ConversionSource; public class HudiConversionSource implements ConversionSource<HoodieInstant> { @@ -77,11 +76,6 @@ public class HudiConversionSource implements ConversionSource<HoodieInstant> { return tableExtractor.table(metaClient, commit); } - @Override - public SchemaCatalog getSchemaCatalog(InternalTable table, HoodieInstant commit) { - return HudiSchemaCatalogExtractor.catalogWithTableSchema(table); - } - @Override public InternalSnapshot getCurrentSnapshot() { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -100,7 +94,6 @@ public class HudiConversionSource implements ConversionSource<HoodieInstant> { InternalTable table = getTable(latestCommit); return InternalSnapshot.builder() .table(table) - .schemaCatalog(getSchemaCatalog(table, latestCommit)) .partitionedDataFiles(dataFileExtractor.getFilesCurrentState(table)) .pendingCommits( pendingInstants.stream() diff --git a/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index d14ab859..5739328c 100644 --- a/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -64,7 +64,6 @@ import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.exception.ParseException; import org.apache.xtable.model.schema.InternalPartitionField; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.storage.DataFilesDiff; import org.apache.xtable.model.storage.FileFormat; @@ -73,7 +72,6 @@ import org.apache.xtable.model.storage.PartitionFileGroup; /** Extracts all the files for Hudi table represented by {@link InternalTable}. */ public class HudiDataFileExtractor implements AutoCloseable { - private static final SchemaVersion DEFAULT_SCHEMA_VERSION = new SchemaVersion(1, null); private final HoodieTableMetadata tableMetadata; private final HoodieTableMetaClient metaClient; private final HoodieEngineContext engineContext; @@ -397,7 +395,6 @@ public class HudiDataFileExtractor implements AutoCloseable { List<PartitionValue> partitionValues, HoodieBaseFile hoodieBaseFile) { long rowCount = 0L; return InternalDataFile.builder() - .schemaVersion(DEFAULT_SCHEMA_VERSION) .physicalPath(hoodieBaseFile.getPath()) .fileFormat(getFileFormat(FSUtils.getFileExtension(hoodieBaseFile.getPath()))) .partitionValues(partitionValues) diff --git a/core/src/main/java/org/apache/xtable/hudi/HudiSchemaCatalogExtractor.java b/core/src/main/java/org/apache/xtable/hudi/HudiSchemaCatalogExtractor.java deleted file mode 100644 index 0258621f..00000000 --- a/core/src/main/java/org/apache/xtable/hudi/HudiSchemaCatalogExtractor.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.hudi; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hudi.common.table.HoodieTableMetaClient; - -import org.apache.xtable.model.InternalTable; -import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.schema.SchemaCatalog; -import org.apache.xtable.model.schema.SchemaVersion; -import org.apache.xtable.spi.extractor.SchemaCatalogExtractor; - -/** Implementation of {@link SchemaCatalogExtractor} for Hudi. */ -public class HudiSchemaCatalogExtractor implements SchemaCatalogExtractor<HoodieTableMetaClient> { - @Override - public SchemaCatalog catalog(HoodieTableMetaClient table) { - // TODO implement this - throw new UnsupportedOperationException("Schema catalog extractor not implemented for Hudi"); - } - - public static SchemaCatalog catalogWithTableSchema(InternalTable table) { - // does not support schema versions for now - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion = new SchemaVersion(1, ""); - schemas.put(schemaVersion, table.getReadSchema()); - return SchemaCatalog.builder().schemas(schemas).build(); - } -} diff --git a/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 330d9f33..7b26ee86 100644 --- a/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -45,8 +45,6 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.schema.SchemaCatalog; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.storage.*; import org.apache.xtable.model.storage.InternalDataFile; @@ -118,25 +116,12 @@ public class IcebergConversionSource implements ConversionSource<Snapshot> { .build(); } - @Override - public SchemaCatalog getSchemaCatalog(InternalTable table, Snapshot snapshot) { - Table iceTable = getSourceTable(); - Integer iceSchemaId = snapshot.schemaId(); - Schema iceSchema = iceTable.schemas().get(iceSchemaId); - IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance(); - InternalSchema irSchema = schemaExtractor.fromIceberg(iceSchema); - Map<SchemaVersion, InternalSchema> catalog = - Collections.singletonMap(new SchemaVersion(iceSchemaId, ""), irSchema); - return SchemaCatalog.builder().schemas(catalog).build(); - } - @Override public InternalSnapshot getCurrentSnapshot() { Table iceTable = getSourceTable(); Snapshot currentSnapshot = iceTable.currentSnapshot(); InternalTable irTable = getTable(currentSnapshot); - SchemaCatalog schemaCatalog = getSchemaCatalog(irTable, currentSnapshot); TableScan scan = iceTable.newScan().useSnapshot(currentSnapshot.snapshotId()); PartitionSpec partitionSpec = iceTable.spec(); @@ -156,7 +141,6 @@ public class IcebergConversionSource implements ConversionSource<Snapshot> { return InternalSnapshot.builder() .version(String.valueOf(currentSnapshot.snapshotId())) .table(irTable) - .schemaCatalog(schemaCatalog) .partitionedDataFiles(partitionedDataFiles) .build(); } diff --git a/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java b/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java index 6a29bd18..3e2d61f5 100644 --- a/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java +++ b/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java @@ -65,8 +65,6 @@ import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.schema.SchemaCatalog; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.stat.Range; @@ -191,11 +189,6 @@ public class ITDeltaConversionTargetSource { DataLayoutStrategy.FLAT, "file:" + basePath, Collections.emptyList()); - // Validate schema catalog - SchemaCatalog schemaCatalog = snapshot.getSchemaCatalog(); - validateSchemaCatalog( - schemaCatalog, - Collections.singletonMap(new SchemaVersion(1, ""), snapshot.getTable().getReadSchema())); // Validate data files List<ColumnStat> columnStats = Arrays.asList(COL1_COLUMN_STAT, COL2_COLUMN_STAT); Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size()); @@ -269,11 +262,6 @@ public class ITDeltaConversionTargetSource { .sourceField(partCol) .transformType(PartitionTransformType.VALUE) .build())); - // Validate schema catalog - SchemaCatalog schemaCatalog = snapshot.getSchemaCatalog(); - validateSchemaCatalog( - schemaCatalog, - Collections.singletonMap(new SchemaVersion(1, ""), snapshot.getTable().getReadSchema())); // Validate data files List<ColumnStat> columnStats = Arrays.asList(COL1_COLUMN_STAT, COL2_COLUMN_STAT); Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size()); @@ -689,11 +677,6 @@ public class ITDeltaConversionTargetSource { Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields()); } - private void validateSchemaCatalog( - SchemaCatalog schemaCatalog, Map<SchemaVersion, InternalSchema> schemas) { - Assertions.assertEquals(schemas, schemaCatalog.getSchemas()); - } - private void validatePartitionDataFiles( PartitionFileGroup expectedPartitionFiles, PartitionFileGroup actualPartitionFiles) throws URISyntaxException { @@ -715,7 +698,6 @@ public class ITDeltaConversionTargetSource { private void validatePropertiesDataFile(InternalDataFile expected, InternalDataFile actual) throws URISyntaxException { - Assertions.assertEquals(expected.getSchemaVersion(), actual.getSchemaVersion()); Assertions.assertTrue( Paths.get(new URI(actual.getPhysicalPath()).getPath()).isAbsolute(), () -> "path == " + actual.getPhysicalPath() + " is not absolute"); diff --git a/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java b/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java index d21d1d18..c701a1d5 100644 --- a/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java +++ b/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java @@ -48,11 +48,8 @@ import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.xtable.model.schema.SchemaVersion; - @NoArgsConstructor(access = AccessLevel.PRIVATE) public class HudiTestUtil { - static final SchemaVersion SCHEMA_VERSION = new SchemaVersion(1, ""); @SneakyThrows static HoodieTableMetaClient initTableAndGetMetaClient( diff --git a/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java b/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java index a0d51b7a..26dba2b4 100644 --- a/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java +++ b/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java @@ -18,7 +18,6 @@ package org.apache.xtable.hudi; -import static org.apache.xtable.hudi.HudiTestUtil.SCHEMA_VERSION; import static org.apache.xtable.hudi.HudiTestUtil.createWriteStatus; import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig; import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient; @@ -188,7 +187,6 @@ public class ITHudiConversionSourceTarget { .physicalPath( String.format("file://%s/%s/%s", tableBasePath, partitionPath, existingFileName1)) .recordCount(2) - .schemaVersion(SCHEMA_VERSION) .build(); String fileName = "file_1.parquet"; String filePath = getFilePath(partitionPath, fileName); @@ -556,7 +554,6 @@ public class ITHudiConversionSourceTarget { .totalSize(5) .build()); return InternalDataFile.builder() - .schemaVersion(SCHEMA_VERSION) .physicalPath(String.format("file://%s/%s/%s", tableBasePath, partitionPath, fileName)) .fileSizeBytes(FILE_SIZE) .fileFormat(FileFormat.APACHE_PARQUET) diff --git a/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java b/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java index dddc736f..8f3b3f7e 100644 --- a/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java +++ b/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java @@ -18,7 +18,6 @@ package org.apache.xtable.hudi; -import static org.apache.xtable.hudi.HudiTestUtil.SCHEMA_VERSION; import static org.apache.xtable.hudi.HudiTestUtil.createWriteStatus; import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig; import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient; @@ -390,7 +389,6 @@ public class TestBaseFileUpdatesExtractor { private InternalDataFile createFile(String physicalPath, List<ColumnStat> columnStats) { return InternalDataFile.builder() - .schemaVersion(SCHEMA_VERSION) .physicalPath(physicalPath) .fileSizeBytes(FILE_SIZE) .fileFormat(FileFormat.APACHE_PARQUET) diff --git a/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java index 07425737..82149c8b 100644 --- a/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java +++ b/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java @@ -69,7 +69,6 @@ import org.apache.xtable.TestJavaHudiTable; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.storage.FileFormat; import org.apache.xtable.model.storage.InternalDataFile; @@ -147,7 +146,6 @@ public class TestHudiFileStatsExtractor { InternalDataFile inputFile = InternalDataFile.builder() .physicalPath(parquetFile.toString()) - .schemaVersion(new SchemaVersion(1, null)) .columnStats(Collections.emptyList()) .fileFormat(FileFormat.APACHE_PARQUET) .lastModified(1234L) @@ -184,7 +182,6 @@ public class TestHudiFileStatsExtractor { InternalDataFile inputFile = InternalDataFile.builder() .physicalPath(file.toString()) - .schemaVersion(new SchemaVersion(1, null)) .columnStats(Collections.emptyList()) .fileFormat(FileFormat.APACHE_PARQUET) .lastModified(1234L) diff --git a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java index dc3276d6..d100a313 100644 --- a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java +++ b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java @@ -38,7 +38,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.mockito.Mockito; import org.apache.iceberg.*; import org.apache.iceberg.data.GenericRecord; @@ -59,8 +58,6 @@ import org.apache.xtable.model.TableChange; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.schema.SchemaCatalog; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.storage.*; import org.apache.xtable.model.storage.FileFormat; @@ -121,32 +118,6 @@ class TestIcebergConversionTargetSource { internalTable.getPartitioningFields().get(0).getTransformType()); } - @Test - public void getSchemaCatalogTest(@TempDir Path workingDir) throws IOException { - Table catalogSales = createTestTableWithData(workingDir.toString()); - Snapshot iceCurrentSnapshot = catalogSales.currentSnapshot(); - - // add extra schema to test current schema is returned - catalogSales.updateSchema().addColumn("new_column", Types.IntegerType.get()).commit(); - assertEquals(2, catalogSales.schemas().size()); - assertEquals(0, iceCurrentSnapshot.schemaId()); - - PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales); - - IcebergConversionSource conversionSource = - sourceProvider.getConversionSourceInstance(sourceTableConfig); - IcebergConversionSource spyConversionSource = Mockito.spy(conversionSource); - - SchemaCatalog schemaCatalog = spyConversionSource.getSchemaCatalog(null, iceCurrentSnapshot); - Assertions.assertNotNull(schemaCatalog); - Map<SchemaVersion, InternalSchema> schemas = schemaCatalog.getSchemas(); - assertEquals(1, schemas.size()); - SchemaVersion expectedSchemaVersion = new SchemaVersion(iceCurrentSnapshot.schemaId(), ""); - InternalSchema irSchemaOfCommit = schemas.get(expectedSchemaVersion); - Assertions.assertNotNull(irSchemaOfCommit); - validateSchema(irSchemaOfCommit, catalogSales.schemas().get(iceCurrentSnapshot.schemaId())); - } - @Test public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException { Table catalogSales = createTestTableWithData(workingDir.toString()); @@ -172,8 +143,6 @@ class TestIcebergConversionTargetSource { assertEquals(String.valueOf(iceCurrentSnapshot.snapshotId()), internalSnapshot.getVersion()); Assertions.assertNotNull(internalSnapshot.getTable()); verify(spyConversionSource, times(1)).getTable(iceCurrentSnapshot); - verify(spyConversionSource, times(1)) - .getSchemaCatalog(internalSnapshot.getTable(), iceCurrentSnapshot); verify(spyPartitionConverter, times(5)).toXTable(any(), any(), any()); verify(spyDataFileExtractor, times(5)).fromIceberg(any(), any(), any()); diff --git a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index 3d479b4e..fa8b162a 100644 --- a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -41,7 +41,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -91,8 +90,6 @@ import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.schema.PartitionTransformType; -import org.apache.xtable.model.schema.SchemaCatalog; -import org.apache.xtable.model.schema.SchemaVersion; import org.apache.xtable.model.stat.PartitionValue; import org.apache.xtable.model.stat.Range; import org.apache.xtable.model.storage.DataLayoutStrategy; @@ -224,17 +221,12 @@ public class TestIcebergSync { InternalTable table1 = getInternalTable(tableName, basePath, internalSchema, null, LAST_COMMIT_TIME); InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion1 = new SchemaVersion(1, ""); - schemas.put(schemaVersion1, internalSchema); - SchemaVersion schemaVersion2 = new SchemaVersion(2, ""); - schemas.put(schemaVersion2, schema2); - - InternalDataFile dataFile1 = getDataFile(schemaVersion1, 1, Collections.emptyList()); - InternalDataFile dataFile2 = getDataFile(schemaVersion1, 2, Collections.emptyList()); - InternalDataFile dataFile3 = getDataFile(schemaVersion2, 3, Collections.emptyList()); - InternalSnapshot snapshot1 = buildSnapshot(table1, schemas, dataFile1, dataFile2); - InternalSnapshot snapshot2 = buildSnapshot(table2, schemas, dataFile2, dataFile3); + + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList()); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList()); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList()); + InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2); ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor = @@ -323,19 +315,14 @@ public class TestIcebergSync { getInternalTable(tableName, basePath, internalSchema, null, LAST_COMMIT_TIME); InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME.plusMillis(100000L)); - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion1 = new SchemaVersion(1, ""); - schemas.put(schemaVersion1, internalSchema); - SchemaVersion schemaVersion2 = new SchemaVersion(2, ""); - schemas.put(schemaVersion2, schema2); - - InternalDataFile dataFile1 = getDataFile(schemaVersion1, 1, Collections.emptyList()); - InternalDataFile dataFile2 = getDataFile(schemaVersion1, 2, Collections.emptyList()); - InternalDataFile dataFile3 = getDataFile(schemaVersion2, 3, Collections.emptyList()); - InternalDataFile dataFile4 = getDataFile(schemaVersion2, 4, Collections.emptyList()); - InternalSnapshot snapshot1 = buildSnapshot(table1, schemas, dataFile1, dataFile2); - InternalSnapshot snapshot2 = buildSnapshot(table2, schemas, dataFile2, dataFile3); - InternalSnapshot snapshot3 = buildSnapshot(table2, schemas, dataFile3, dataFile4); + + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList()); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList()); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList()); + InternalDataFile dataFile4 = getDataFile(4, Collections.emptyList()); + InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3); + InternalSnapshot snapshot3 = buildSnapshot(table2, dataFile3, dataFile4); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2); ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor = @@ -393,9 +380,6 @@ public class TestIcebergSync { internalSchema, Collections.singletonList(partitionField), LAST_COMMIT_TIME); - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion = new SchemaVersion(1, ""); - schemas.put(schemaVersion, internalSchema); List<PartitionValue> partitionValues1 = Collections.singletonList( @@ -409,10 +393,10 @@ public class TestIcebergSync { .partitionField(partitionField) .range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli())) .build()); - InternalDataFile dataFile1 = getDataFile(schemaVersion, 1, partitionValues1); - InternalDataFile dataFile2 = getDataFile(schemaVersion, 2, partitionValues1); - InternalDataFile dataFile3 = getDataFile(schemaVersion, 3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2, dataFile3); + InternalDataFile dataFile1 = getDataFile(1, partitionValues1); + InternalDataFile dataFile2 = getDataFile(2, partitionValues1); + InternalDataFile dataFile3 = getDataFile(3, partitionValues2); + InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)) .thenReturn(icebergSchema) @@ -459,9 +443,6 @@ public class TestIcebergSync { internalSchema, Collections.singletonList(partitionField), LAST_COMMIT_TIME); - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion = new SchemaVersion(1, ""); - schemas.put(schemaVersion, internalSchema); List<PartitionValue> partitionValues1 = Collections.singletonList( @@ -475,10 +456,10 @@ public class TestIcebergSync { .partitionField(partitionField) .range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli())) .build()); - InternalDataFile dataFile1 = getDataFile(schemaVersion, 1, partitionValues1); - InternalDataFile dataFile2 = getDataFile(schemaVersion, 2, partitionValues1); - InternalDataFile dataFile3 = getDataFile(schemaVersion, 3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2, dataFile3); + InternalDataFile dataFile1 = getDataFile(1, partitionValues1); + InternalDataFile dataFile2 = getDataFile(2, partitionValues1); + InternalDataFile dataFile3 = getDataFile(3, partitionValues2); + InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -522,9 +503,6 @@ public class TestIcebergSync { internalSchema, Collections.singletonList(partitionField), LAST_COMMIT_TIME); - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion = new SchemaVersion(1, ""); - schemas.put(schemaVersion, internalSchema); List<PartitionValue> partitionValues1 = Collections.singletonList( @@ -532,10 +510,10 @@ public class TestIcebergSync { List<PartitionValue> partitionValues2 = Collections.singletonList( PartitionValue.builder().partitionField(partitionField).range(Range.scalar(2)).build()); - InternalDataFile dataFile1 = getDataFile(schemaVersion, 1, partitionValues1); - InternalDataFile dataFile2 = getDataFile(schemaVersion, 2, partitionValues1); - InternalDataFile dataFile3 = getDataFile(schemaVersion, 3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2, dataFile3); + InternalDataFile dataFile1 = getDataFile(1, partitionValues1); + InternalDataFile dataFile2 = getDataFile(2, partitionValues1); + InternalDataFile dataFile3 = getDataFile(3, partitionValues2); + InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -585,9 +563,6 @@ public class TestIcebergSync { internalSchema, Arrays.asList(partitionField1, partitionField2), LAST_COMMIT_TIME); - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion = new SchemaVersion(1, ""); - schemas.put(schemaVersion, internalSchema); List<PartitionValue> partitionValues1 = Arrays.asList( @@ -610,10 +585,10 @@ public class TestIcebergSync { .partitionField(partitionField2) .range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli())) .build()); - InternalDataFile dataFile1 = getDataFile(schemaVersion, 1, partitionValues1); - InternalDataFile dataFile2 = getDataFile(schemaVersion, 2, partitionValues2); - InternalDataFile dataFile3 = getDataFile(schemaVersion, 3, partitionValues3); - InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2, dataFile3); + InternalDataFile dataFile1 = getDataFile(1, partitionValues1); + InternalDataFile dataFile2 = getDataFile(2, partitionValues2); + InternalDataFile dataFile3 = getDataFile(3, partitionValues3); + InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -663,9 +638,6 @@ public class TestIcebergSync { internalSchema, Collections.singletonList(partitionField), LAST_COMMIT_TIME); - Map<SchemaVersion, InternalSchema> schemas = new HashMap<>(); - SchemaVersion schemaVersion = new SchemaVersion(1, ""); - schemas.put(schemaVersion, internalSchema); List<PartitionValue> partitionValues1 = Collections.singletonList( @@ -679,10 +651,10 @@ public class TestIcebergSync { .partitionField(partitionField) .range(Range.scalar("value2")) .build()); - InternalDataFile dataFile1 = getDataFile(schemaVersion, 1, partitionValues1); - InternalDataFile dataFile2 = getDataFile(schemaVersion, 2, partitionValues1); - InternalDataFile dataFile3 = getDataFile(schemaVersion, 3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1, dataFile2, dataFile3); + InternalDataFile dataFile1 = getDataFile(1, partitionValues1); + InternalDataFile dataFile2 = getDataFile(2, partitionValues1); + InternalDataFile dataFile3 = getDataFile(3, partitionValues2); + InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -707,26 +679,20 @@ public class TestIcebergSync { Expressions.equal(partitionField.getSourceField().getPath(), "value1")); } - private InternalSnapshot buildSnapshot( - InternalTable table, - Map<SchemaVersion, InternalSchema> schemas, - InternalDataFile... dataFiles) { + private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) { return InternalSnapshot.builder() .table(table) - .schemaCatalog(SchemaCatalog.builder().schemas(schemas).build()) .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles))) .build(); } - private InternalDataFile getDataFile( - SchemaVersion schemaVersion, int index, List<PartitionValue> partitionValues) { + private InternalDataFile getDataFile(int index, List<PartitionValue> partitionValues) { String physicalPath = "file:/physical" + index + ".parquet"; return InternalDataFile.builder() .fileFormat(FileFormat.APACHE_PARQUET) .fileSizeBytes(RANDOM.nextInt(10000)) .physicalPath(physicalPath) .recordCount(RANDOM.nextInt(10000)) - .schemaVersion(schemaVersion) .partitionValues(partitionValues) .columnStats(Collections.emptyList()) .build();
