This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 121b3d51 remove schema catalog and schema versions
121b3d51 is described below
commit 121b3d515c625cd2e5d4d427644f86e79ecf88c1
Author: Timothy Brown <[email protected]>
AuthorDate: Wed Apr 24 13:52:11 2024 -0500
remove schema catalog and schema versions
---
.../org/apache/xtable/model/InternalSnapshot.java | 3 -
.../apache/xtable/model/schema/SchemaCatalog.java | 41 --------
.../apache/xtable/model/schema/SchemaVersion.java | 35 -------
.../xtable/model/storage/InternalDataFile.java | 3 -
.../xtable/spi/extractor/ConversionSource.java | 10 --
.../spi/extractor/SchemaCatalogExtractor.java | 31 ------
.../spi/extractor/TestExtractFromSource.java | 8 +-
.../apache/xtable/delta/DeltaConversionSource.java | 14 ---
.../apache/xtable/hudi/HudiConversionSource.java | 7 --
.../apache/xtable/hudi/HudiDataFileExtractor.java | 3 -
.../xtable/hudi/HudiSchemaCatalogExtractor.java | 47 ---------
.../xtable/iceberg/IcebergConversionSource.java | 16 ----
.../delta/ITDeltaConversionTargetSource.java | 18 ----
.../java/org/apache/xtable/hudi/HudiTestUtil.java | 3 -
.../xtable/hudi/ITHudiConversionSourceTarget.java | 3 -
.../xtable/hudi/TestBaseFileUpdatesExtractor.java | 2 -
.../xtable/hudi/TestHudiFileStatsExtractor.java | 3 -
.../iceberg/TestIcebergConversionTargetSource.java | 31 ------
.../org/apache/xtable/iceberg/TestIcebergSync.java | 106 +++++++--------------
19 files changed, 37 insertions(+), 347 deletions(-)
diff --git a/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
b/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
index 4c234922..345fda78 100644
--- a/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
+++ b/api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
@@ -25,7 +25,6 @@ import java.util.List;
import lombok.Builder;
import lombok.Value;
-import org.apache.xtable.model.schema.SchemaCatalog;
import org.apache.xtable.model.storage.PartitionFileGroup;
/**
@@ -44,8 +43,6 @@ public class InternalSnapshot {
String version;
// Table reference
InternalTable table;
- // Schema catalog referencing the written schema for each data file in the
snapshot
- SchemaCatalog schemaCatalog;
// Data files grouped by partition
List<PartitionFileGroup> partitionedDataFiles;
// pending commits before latest commit on the table.
diff --git
a/api/src/main/java/org/apache/xtable/model/schema/SchemaCatalog.java
b/api/src/main/java/org/apache/xtable/model/schema/SchemaCatalog.java
deleted file mode 100644
index 64261425..00000000
--- a/api/src/main/java/org/apache/xtable/model/schema/SchemaCatalog.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.model.schema;
-
-import java.util.Map;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Value;
-
-/**
- * Mapping of schema version to the actual schema.
- *
- * @since 0.1
- */
-@Getter
-@Value
-@Builder
-public class SchemaCatalog {
- Map<SchemaVersion, InternalSchema> schemas;
-
- public SchemaCatalog(Map<SchemaVersion, InternalSchema> schemas) {
- this.schemas = schemas;
- }
-}
diff --git
a/api/src/main/java/org/apache/xtable/model/schema/SchemaVersion.java
b/api/src/main/java/org/apache/xtable/model/schema/SchemaVersion.java
deleted file mode 100644
index 9e6130b6..00000000
--- a/api/src/main/java/org/apache/xtable/model/schema/SchemaVersion.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.model.schema;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Value;
-
-/**
- * Represents the schema version.
- *
- * @since 0.1
- */
-@Getter
-@Value
-public class SchemaVersion {
- int version;
- @EqualsAndHashCode.Exclude String evolutionComment;
-}
diff --git
a/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
b/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
index 52effbe2..3aee766e 100644
--- a/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
+++ b/api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
@@ -25,7 +25,6 @@ import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
@@ -37,8 +36,6 @@ import org.apache.xtable.model.stat.PartitionValue;
@Builder(toBuilder = true)
@Value
public class InternalDataFile {
- // written schema version
- SchemaVersion schemaVersion;
// physical path of the file (absolute with scheme)
@NonNull String physicalPath;
// file format
diff --git
a/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
b/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
index ed1486b9..2500454c 100644
--- a/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
+++ b/api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
@@ -26,7 +26,6 @@ import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.schema.SchemaCatalog;
/**
* A client that provides the major functionality for extracting the state at
a given instant in a
@@ -42,15 +41,6 @@ public interface ConversionSource<COMMIT> extends Closeable {
*/
InternalTable getTable(COMMIT commit);
- /**
- * Extracts the {@link SchemaCatalog} as of the provided instant.
- *
- * @param table the current state of the table for this commit
- * @param commit the commit to consider for reading the schema catalog
- * @return the schema catalog
- */
- SchemaCatalog getSchemaCatalog(InternalTable table, COMMIT commit);
-
/**
* Extracts the {@link InternalSnapshot} as of latest state.
*
diff --git
a/api/src/main/java/org/apache/xtable/spi/extractor/SchemaCatalogExtractor.java
b/api/src/main/java/org/apache/xtable/spi/extractor/SchemaCatalogExtractor.java
deleted file mode 100644
index 12a078e5..00000000
---
a/api/src/main/java/org/apache/xtable/spi/extractor/SchemaCatalogExtractor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.spi.extractor;
-
-import org.apache.xtable.model.schema.SchemaCatalog;
-
-/**
- * Extracts and holds versioned schemas from {@link CLIENT}
- *
- * @param <CLIENT> table format client from which table metadata can be
retrieved and Schema Catalog
- * is built. Can use `HoodieTableMetaClient` for Hudi and `DeltaLog` for
Delta.
- */
-public interface SchemaCatalogExtractor<CLIENT> {
- SchemaCatalog catalog(CLIENT client);
-}
diff --git
a/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
b/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
index 82b61609..8503057d 100644
---
a/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
+++
b/api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
@@ -41,7 +41,6 @@ import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.schema.SchemaCatalog;
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.PartitionFileGroup;
@@ -52,14 +51,9 @@ public class TestExtractFromSource {
@Test
public void extractSnapshot() {
InternalTable table =
InternalTable.builder().latestCommitTime(Instant.now()).build();
- SchemaCatalog schemaCatalog = new SchemaCatalog(Collections.emptyMap());
List<PartitionFileGroup> dataFiles = Collections.emptyList();
InternalSnapshot internalSnapshot =
- InternalSnapshot.builder()
- .schemaCatalog(schemaCatalog)
- .table(table)
- .partitionedDataFiles(dataFiles)
- .build();
+
InternalSnapshot.builder().table(table).partitionedDataFiles(dataFiles).build();
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
assertEquals(internalSnapshot,
ExtractFromSource.of(mockConversionSource).extractSnapshot());
}
diff --git
a/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
b/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 9a00e54c..19ecc02c 100644
--- a/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ b/core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -21,10 +21,8 @@ package org.apache.xtable.delta;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -51,8 +49,6 @@ import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.schema.SchemaCatalog;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
@@ -85,15 +81,6 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
return tableExtractor.table(deltaLog, tableName, version);
}
- @Override
- public SchemaCatalog getSchemaCatalog(InternalTable table, Long version) {
- // TODO: Does not support schema versions for now
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion = new SchemaVersion(1, "");
- schemas.put(schemaVersion, table.getReadSchema());
- return SchemaCatalog.builder().schemas(schemas).build();
- }
-
@Override
public InternalSnapshot getCurrentSnapshot() {
DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath);
@@ -101,7 +88,6 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
InternalTable table = getTable(snapshot.version());
return InternalSnapshot.builder()
.table(table)
- .schemaCatalog(getSchemaCatalog(table, snapshot.version()))
.partitionedDataFiles(getInternalDataFiles(snapshot,
table.getReadSchema()))
.build();
}
diff --git
a/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
b/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index bdb798f1..1b1d0bf3 100644
--- a/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -49,7 +49,6 @@ import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.schema.SchemaCatalog;
import org.apache.xtable.spi.extractor.ConversionSource;
public class HudiConversionSource implements ConversionSource<HoodieInstant> {
@@ -77,11 +76,6 @@ public class HudiConversionSource implements
ConversionSource<HoodieInstant> {
return tableExtractor.table(metaClient, commit);
}
- @Override
- public SchemaCatalog getSchemaCatalog(InternalTable table, HoodieInstant
commit) {
- return HudiSchemaCatalogExtractor.catalogWithTableSchema(table);
- }
-
@Override
public InternalSnapshot getCurrentSnapshot() {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
@@ -100,7 +94,6 @@ public class HudiConversionSource implements
ConversionSource<HoodieInstant> {
InternalTable table = getTable(latestCommit);
return InternalSnapshot.builder()
.table(table)
- .schemaCatalog(getSchemaCatalog(table, latestCommit))
.partitionedDataFiles(dataFileExtractor.getFilesCurrentState(table))
.pendingCommits(
pendingInstants.stream()
diff --git
a/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
b/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
index d14ab859..5739328c 100644
--- a/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
+++ b/core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
@@ -64,7 +64,6 @@ import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.exception.ParseException;
import org.apache.xtable.model.schema.InternalPartitionField;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FileFormat;
@@ -73,7 +72,6 @@ import org.apache.xtable.model.storage.PartitionFileGroup;
/** Extracts all the files for Hudi table represented by {@link
InternalTable}. */
public class HudiDataFileExtractor implements AutoCloseable {
- private static final SchemaVersion DEFAULT_SCHEMA_VERSION = new
SchemaVersion(1, null);
private final HoodieTableMetadata tableMetadata;
private final HoodieTableMetaClient metaClient;
private final HoodieEngineContext engineContext;
@@ -397,7 +395,6 @@ public class HudiDataFileExtractor implements AutoCloseable
{
List<PartitionValue> partitionValues, HoodieBaseFile hoodieBaseFile) {
long rowCount = 0L;
return InternalDataFile.builder()
- .schemaVersion(DEFAULT_SCHEMA_VERSION)
.physicalPath(hoodieBaseFile.getPath())
.fileFormat(getFileFormat(FSUtils.getFileExtension(hoodieBaseFile.getPath())))
.partitionValues(partitionValues)
diff --git
a/core/src/main/java/org/apache/xtable/hudi/HudiSchemaCatalogExtractor.java
b/core/src/main/java/org/apache/xtable/hudi/HudiSchemaCatalogExtractor.java
deleted file mode 100644
index 0258621f..00000000
--- a/core/src/main/java/org/apache/xtable/hudi/HudiSchemaCatalogExtractor.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.hudi;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-
-import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.schema.SchemaCatalog;
-import org.apache.xtable.model.schema.SchemaVersion;
-import org.apache.xtable.spi.extractor.SchemaCatalogExtractor;
-
-/** Implementation of {@link SchemaCatalogExtractor} for Hudi. */
-public class HudiSchemaCatalogExtractor implements
SchemaCatalogExtractor<HoodieTableMetaClient> {
- @Override
- public SchemaCatalog catalog(HoodieTableMetaClient table) {
- // TODO implement this
- throw new UnsupportedOperationException("Schema catalog extractor not
implemented for Hudi");
- }
-
- public static SchemaCatalog catalogWithTableSchema(InternalTable table) {
- // does not support schema versions for now
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion = new SchemaVersion(1, "");
- schemas.put(schemaVersion, table.getReadSchema());
- return SchemaCatalog.builder().schemas(schemas).build();
- }
-}
diff --git
a/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
b/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 330d9f33..7b26ee86 100644
--- a/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ b/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -45,8 +45,6 @@ import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
-import org.apache.xtable.model.schema.SchemaCatalog;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.storage.*;
import org.apache.xtable.model.storage.InternalDataFile;
@@ -118,25 +116,12 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
.build();
}
- @Override
- public SchemaCatalog getSchemaCatalog(InternalTable table, Snapshot
snapshot) {
- Table iceTable = getSourceTable();
- Integer iceSchemaId = snapshot.schemaId();
- Schema iceSchema = iceTable.schemas().get(iceSchemaId);
- IcebergSchemaExtractor schemaExtractor =
IcebergSchemaExtractor.getInstance();
- InternalSchema irSchema = schemaExtractor.fromIceberg(iceSchema);
- Map<SchemaVersion, InternalSchema> catalog =
- Collections.singletonMap(new SchemaVersion(iceSchemaId, ""), irSchema);
- return SchemaCatalog.builder().schemas(catalog).build();
- }
-
@Override
public InternalSnapshot getCurrentSnapshot() {
Table iceTable = getSourceTable();
Snapshot currentSnapshot = iceTable.currentSnapshot();
InternalTable irTable = getTable(currentSnapshot);
- SchemaCatalog schemaCatalog = getSchemaCatalog(irTable, currentSnapshot);
TableScan scan =
iceTable.newScan().useSnapshot(currentSnapshot.snapshotId());
PartitionSpec partitionSpec = iceTable.spec();
@@ -156,7 +141,6 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
return InternalSnapshot.builder()
.version(String.valueOf(currentSnapshot.snapshotId()))
.table(irTable)
- .schemaCatalog(schemaCatalog)
.partitionedDataFiles(partitionedDataFiles)
.build();
}
diff --git
a/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
b/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
index 6a29bd18..3e2d61f5 100644
---
a/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
+++
b/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
@@ -65,8 +65,6 @@ import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.schema.SchemaCatalog;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
@@ -191,11 +189,6 @@ public class ITDeltaConversionTargetSource {
DataLayoutStrategy.FLAT,
"file:" + basePath,
Collections.emptyList());
- // Validate schema catalog
- SchemaCatalog schemaCatalog = snapshot.getSchemaCatalog();
- validateSchemaCatalog(
- schemaCatalog,
- Collections.singletonMap(new SchemaVersion(1, ""),
snapshot.getTable().getReadSchema()));
// Validate data files
List<ColumnStat> columnStats = Arrays.asList(COL1_COLUMN_STAT,
COL2_COLUMN_STAT);
Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size());
@@ -269,11 +262,6 @@ public class ITDeltaConversionTargetSource {
.sourceField(partCol)
.transformType(PartitionTransformType.VALUE)
.build()));
- // Validate schema catalog
- SchemaCatalog schemaCatalog = snapshot.getSchemaCatalog();
- validateSchemaCatalog(
- schemaCatalog,
- Collections.singletonMap(new SchemaVersion(1, ""),
snapshot.getTable().getReadSchema()));
// Validate data files
List<ColumnStat> columnStats = Arrays.asList(COL1_COLUMN_STAT,
COL2_COLUMN_STAT);
Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size());
@@ -689,11 +677,6 @@ public class ITDeltaConversionTargetSource {
Assertions.assertEquals(partitioningFields,
internalTable.getPartitioningFields());
}
- private void validateSchemaCatalog(
- SchemaCatalog schemaCatalog, Map<SchemaVersion, InternalSchema> schemas)
{
- Assertions.assertEquals(schemas, schemaCatalog.getSchemas());
- }
-
private void validatePartitionDataFiles(
PartitionFileGroup expectedPartitionFiles, PartitionFileGroup
actualPartitionFiles)
throws URISyntaxException {
@@ -715,7 +698,6 @@ public class ITDeltaConversionTargetSource {
private void validatePropertiesDataFile(InternalDataFile expected,
InternalDataFile actual)
throws URISyntaxException {
- Assertions.assertEquals(expected.getSchemaVersion(),
actual.getSchemaVersion());
Assertions.assertTrue(
Paths.get(new URI(actual.getPhysicalPath()).getPath()).isAbsolute(),
() -> "path == " + actual.getPhysicalPath() + " is not absolute");
diff --git a/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java
b/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java
index d21d1d18..c701a1d5 100644
--- a/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java
+++ b/core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java
@@ -48,11 +48,8 @@ import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.xtable.model.schema.SchemaVersion;
-
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class HudiTestUtil {
- static final SchemaVersion SCHEMA_VERSION = new SchemaVersion(1, "");
@SneakyThrows
static HoodieTableMetaClient initTableAndGetMetaClient(
diff --git
a/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
b/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
index a0d51b7a..26dba2b4 100644
---
a/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
+++
b/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
@@ -18,7 +18,6 @@
package org.apache.xtable.hudi;
-import static org.apache.xtable.hudi.HudiTestUtil.SCHEMA_VERSION;
import static org.apache.xtable.hudi.HudiTestUtil.createWriteStatus;
import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig;
import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient;
@@ -188,7 +187,6 @@ public class ITHudiConversionSourceTarget {
.physicalPath(
String.format("file://%s/%s/%s", tableBasePath, partitionPath,
existingFileName1))
.recordCount(2)
- .schemaVersion(SCHEMA_VERSION)
.build();
String fileName = "file_1.parquet";
String filePath = getFilePath(partitionPath, fileName);
@@ -556,7 +554,6 @@ public class ITHudiConversionSourceTarget {
.totalSize(5)
.build());
return InternalDataFile.builder()
- .schemaVersion(SCHEMA_VERSION)
.physicalPath(String.format("file://%s/%s/%s", tableBasePath,
partitionPath, fileName))
.fileSizeBytes(FILE_SIZE)
.fileFormat(FileFormat.APACHE_PARQUET)
diff --git
a/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
b/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
index dddc736f..8f3b3f7e 100644
---
a/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
+++
b/core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
@@ -18,7 +18,6 @@
package org.apache.xtable.hudi;
-import static org.apache.xtable.hudi.HudiTestUtil.SCHEMA_VERSION;
import static org.apache.xtable.hudi.HudiTestUtil.createWriteStatus;
import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig;
import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient;
@@ -390,7 +389,6 @@ public class TestBaseFileUpdatesExtractor {
private InternalDataFile createFile(String physicalPath, List<ColumnStat>
columnStats) {
return InternalDataFile.builder()
- .schemaVersion(SCHEMA_VERSION)
.physicalPath(physicalPath)
.fileSizeBytes(FILE_SIZE)
.fileFormat(FileFormat.APACHE_PARQUET)
diff --git
a/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
b/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
index 07425737..82149c8b 100644
--- a/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
+++ b/core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
@@ -69,7 +69,6 @@ import org.apache.xtable.TestJavaHudiTable;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
@@ -147,7 +146,6 @@ public class TestHudiFileStatsExtractor {
InternalDataFile inputFile =
InternalDataFile.builder()
.physicalPath(parquetFile.toString())
- .schemaVersion(new SchemaVersion(1, null))
.columnStats(Collections.emptyList())
.fileFormat(FileFormat.APACHE_PARQUET)
.lastModified(1234L)
@@ -184,7 +182,6 @@ public class TestHudiFileStatsExtractor {
InternalDataFile inputFile =
InternalDataFile.builder()
.physicalPath(file.toString())
- .schemaVersion(new SchemaVersion(1, null))
.columnStats(Collections.emptyList())
.fileFormat(FileFormat.APACHE_PARQUET)
.lastModified(1234L)
diff --git
a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
index dc3276d6..d100a313 100644
---
a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
+++
b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
@@ -38,7 +38,6 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.mockito.Mockito;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericRecord;
@@ -59,8 +58,6 @@ import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.schema.SchemaCatalog;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.storage.*;
import org.apache.xtable.model.storage.FileFormat;
@@ -121,32 +118,6 @@ class TestIcebergConversionTargetSource {
internalTable.getPartitioningFields().get(0).getTransformType());
}
- @Test
- public void getSchemaCatalogTest(@TempDir Path workingDir) throws
IOException {
- Table catalogSales = createTestTableWithData(workingDir.toString());
- Snapshot iceCurrentSnapshot = catalogSales.currentSnapshot();
-
- // add extra schema to test current schema is returned
- catalogSales.updateSchema().addColumn("new_column",
Types.IntegerType.get()).commit();
- assertEquals(2, catalogSales.schemas().size());
- assertEquals(0, iceCurrentSnapshot.schemaId());
-
- PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales);
-
- IcebergConversionSource conversionSource =
- sourceProvider.getConversionSourceInstance(sourceTableConfig);
- IcebergConversionSource spyConversionSource =
Mockito.spy(conversionSource);
-
- SchemaCatalog schemaCatalog = spyConversionSource.getSchemaCatalog(null,
iceCurrentSnapshot);
- Assertions.assertNotNull(schemaCatalog);
- Map<SchemaVersion, InternalSchema> schemas = schemaCatalog.getSchemas();
- assertEquals(1, schemas.size());
- SchemaVersion expectedSchemaVersion = new
SchemaVersion(iceCurrentSnapshot.schemaId(), "");
- InternalSchema irSchemaOfCommit = schemas.get(expectedSchemaVersion);
- Assertions.assertNotNull(irSchemaOfCommit);
- validateSchema(irSchemaOfCommit,
catalogSales.schemas().get(iceCurrentSnapshot.schemaId()));
- }
-
@Test
public void testGetCurrentSnapshot(@TempDir Path workingDir) throws
IOException {
Table catalogSales = createTestTableWithData(workingDir.toString());
@@ -172,8 +143,6 @@ class TestIcebergConversionTargetSource {
assertEquals(String.valueOf(iceCurrentSnapshot.snapshotId()),
internalSnapshot.getVersion());
Assertions.assertNotNull(internalSnapshot.getTable());
verify(spyConversionSource, times(1)).getTable(iceCurrentSnapshot);
- verify(spyConversionSource, times(1))
- .getSchemaCatalog(internalSnapshot.getTable(), iceCurrentSnapshot);
verify(spyPartitionConverter, times(5)).toXTable(any(), any(), any());
verify(spyDataFileExtractor, times(5)).fromIceberg(any(), any(), any());
diff --git a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index 3d479b4e..fa8b162a 100644
--- a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -41,7 +41,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -91,8 +90,6 @@ import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.schema.PartitionTransformType;
-import org.apache.xtable.model.schema.SchemaCatalog;
-import org.apache.xtable.model.schema.SchemaVersion;
import org.apache.xtable.model.stat.PartitionValue;
import org.apache.xtable.model.stat.Range;
import org.apache.xtable.model.storage.DataLayoutStrategy;
@@ -224,17 +221,12 @@ public class TestIcebergSync {
InternalTable table1 =
getInternalTable(tableName, basePath, internalSchema, null,
LAST_COMMIT_TIME);
InternalTable table2 = getInternalTable(tableName, basePath, schema2,
null, LAST_COMMIT_TIME);
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion1 = new SchemaVersion(1, "");
- schemas.put(schemaVersion1, internalSchema);
- SchemaVersion schemaVersion2 = new SchemaVersion(2, "");
- schemas.put(schemaVersion2, schema2);
-
- InternalDataFile dataFile1 = getDataFile(schemaVersion1, 1,
Collections.emptyList());
- InternalDataFile dataFile2 = getDataFile(schemaVersion1, 2,
Collections.emptyList());
- InternalDataFile dataFile3 = getDataFile(schemaVersion2, 3,
Collections.emptyList());
- InternalSnapshot snapshot1 = buildSnapshot(table1, schemas, dataFile1,
dataFile2);
- InternalSnapshot snapshot2 = buildSnapshot(table2, schemas, dataFile2,
dataFile3);
+
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
+ InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
+ InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
+ InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor =
@@ -323,19 +315,14 @@ public class TestIcebergSync {
getInternalTable(tableName, basePath, internalSchema, null,
LAST_COMMIT_TIME);
InternalTable table2 =
getInternalTable(tableName, basePath, schema2, null,
LAST_COMMIT_TIME.plusMillis(100000L));
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion1 = new SchemaVersion(1, "");
- schemas.put(schemaVersion1, internalSchema);
- SchemaVersion schemaVersion2 = new SchemaVersion(2, "");
- schemas.put(schemaVersion2, schema2);
-
- InternalDataFile dataFile1 = getDataFile(schemaVersion1, 1,
Collections.emptyList());
- InternalDataFile dataFile2 = getDataFile(schemaVersion1, 2,
Collections.emptyList());
- InternalDataFile dataFile3 = getDataFile(schemaVersion2, 3,
Collections.emptyList());
- InternalDataFile dataFile4 = getDataFile(schemaVersion2, 4,
Collections.emptyList());
- InternalSnapshot snapshot1 = buildSnapshot(table1, schemas, dataFile1,
dataFile2);
- InternalSnapshot snapshot2 = buildSnapshot(table2, schemas, dataFile2,
dataFile3);
- InternalSnapshot snapshot3 = buildSnapshot(table2, schemas, dataFile3,
dataFile4);
+
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
+ InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
+ InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
+ InternalDataFile dataFile4 = getDataFile(4, Collections.emptyList());
+ InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
+ InternalSnapshot snapshot3 = buildSnapshot(table2, dataFile3, dataFile4);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor =
@@ -393,9 +380,6 @@ public class TestIcebergSync {
internalSchema,
Collections.singletonList(partitionField),
LAST_COMMIT_TIME);
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion = new SchemaVersion(1, "");
- schemas.put(schemaVersion, internalSchema);
List<PartitionValue> partitionValues1 =
Collections.singletonList(
@@ -409,10 +393,10 @@ public class TestIcebergSync {
.partitionField(partitionField)
.range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli()))
.build());
- InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
- InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
- InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1,
dataFile2, dataFile3);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
+ InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema))
.thenReturn(icebergSchema)
@@ -459,9 +443,6 @@ public class TestIcebergSync {
internalSchema,
Collections.singletonList(partitionField),
LAST_COMMIT_TIME);
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion = new SchemaVersion(1, "");
- schemas.put(schemaVersion, internalSchema);
List<PartitionValue> partitionValues1 =
Collections.singletonList(
@@ -475,10 +456,10 @@ public class TestIcebergSync {
.partitionField(partitionField)
.range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli()))
.build());
- InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
- InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
- InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1,
dataFile2, dataFile3);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
+ InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -522,9 +503,6 @@ public class TestIcebergSync {
internalSchema,
Collections.singletonList(partitionField),
LAST_COMMIT_TIME);
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion = new SchemaVersion(1, "");
- schemas.put(schemaVersion, internalSchema);
List<PartitionValue> partitionValues1 =
Collections.singletonList(
@@ -532,10 +510,10 @@ public class TestIcebergSync {
List<PartitionValue> partitionValues2 =
Collections.singletonList(
PartitionValue.builder().partitionField(partitionField).range(Range.scalar(2)).build());
- InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
- InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
- InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1,
dataFile2, dataFile3);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
+ InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -585,9 +563,6 @@ public class TestIcebergSync {
internalSchema,
Arrays.asList(partitionField1, partitionField2),
LAST_COMMIT_TIME);
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion = new SchemaVersion(1, "");
- schemas.put(schemaVersion, internalSchema);
List<PartitionValue> partitionValues1 =
Arrays.asList(
@@ -610,10 +585,10 @@ public class TestIcebergSync {
.partitionField(partitionField2)
.range(Range.scalar(Instant.parse("2022-10-03T00:00:00.00Z").toEpochMilli()))
.build());
- InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
- InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues2);
- InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues3);
- InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1,
dataFile2, dataFile3);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues2);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues3);
+ InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -663,9 +638,6 @@ public class TestIcebergSync {
internalSchema,
Collections.singletonList(partitionField),
LAST_COMMIT_TIME);
- Map<SchemaVersion, InternalSchema> schemas = new HashMap<>();
- SchemaVersion schemaVersion = new SchemaVersion(1, "");
- schemas.put(schemaVersion, internalSchema);
List<PartitionValue> partitionValues1 =
Collections.singletonList(
@@ -679,10 +651,10 @@ public class TestIcebergSync {
.partitionField(partitionField)
.range(Range.scalar("value2"))
.build());
- InternalDataFile dataFile1 = getDataFile(schemaVersion, 1,
partitionValues1);
- InternalDataFile dataFile2 = getDataFile(schemaVersion, 2,
partitionValues1);
- InternalDataFile dataFile3 = getDataFile(schemaVersion, 3,
partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, schemas, dataFile1,
dataFile2, dataFile3);
+ InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
+ InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
+ InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
+ InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -707,26 +679,20 @@ public class TestIcebergSync {
Expressions.equal(partitionField.getSourceField().getPath(),
"value1"));
}
- private InternalSnapshot buildSnapshot(
- InternalTable table,
- Map<SchemaVersion, InternalSchema> schemas,
- InternalDataFile... dataFiles) {
+ private InternalSnapshot buildSnapshot(InternalTable table,
InternalDataFile... dataFiles) {
return InternalSnapshot.builder()
.table(table)
- .schemaCatalog(SchemaCatalog.builder().schemas(schemas).build())
.partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles)))
.build();
}
- private InternalDataFile getDataFile(
- SchemaVersion schemaVersion, int index, List<PartitionValue>
partitionValues) {
+ private InternalDataFile getDataFile(int index, List<PartitionValue>
partitionValues) {
String physicalPath = "file:/physical" + index + ".parquet";
return InternalDataFile.builder()
.fileFormat(FileFormat.APACHE_PARQUET)
.fileSizeBytes(RANDOM.nextInt(10000))
.physicalPath(physicalPath)
.recordCount(RANDOM.nextInt(10000))
- .schemaVersion(schemaVersion)
.partitionValues(partitionValues)
.columnStats(Collections.emptyList())
.build();