the-other-tim-brown commented on code in PR #729: URL: https://github.com/apache/incubator-xtable/pull/729#discussion_r2557197815
########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelTableExtractor.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.*; Review Comment: nitpick: remove star import ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelTableExtractor.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.*; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class DeltaKernelTableExtractor { + @Builder.Default + private static final DeltaKernelSchemaExtractor schemaExtractor = + DeltaKernelSchemaExtractor.getInstance(); + + private final String basePath; + + public InternalTable table( + Table deltaKernelTable, Snapshot snapshot, Engine engine, String tableName, String basePath) { + try { + // Get schema from Delta Kernel's snapshot + io.delta.kernel.types.StructType schema = snapshot.getSchema(); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema); + // Get partition columns); + StructType fullSchema = snapshot.getSchema(); // The full table schema + List<String> partitionColumns = snapshot.getPartitionColumnNames(); // List<String> Review Comment: ```suggestion List<String> partitionColumns = snapshot.getPartitionColumnNames(); ``` ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.io.IOException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.internal.util.VectorUtils; + +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.CommitsBacklog; +import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.spi.extractor.ConversionSource; +import org.apache.xtable.spi.extractor.DataFileIterator; + +@Slf4j +@Builder +public class DeltaKernelConversionSource implements ConversionSource<Long> { + + @Builder.Default + private final DeltaKernelDataFileExtractor dataFileExtractor = + DeltaKernelDataFileExtractor.builder().build(); + + @Builder.Default + private final DeltaKernelActionsConverter actionsConverter = + DeltaKernelActionsConverter.getInstance(); + + private final String basePath; + private final String tableName; + private final Engine engine; + + @Builder.Default + private final DeltaKernelTableExtractor tableExtractor = + DeltaKernelTableExtractor.builder().build(); + + private Optional<DeltaKernelIncrementalChangesState> deltaKernelIncrementalChangesState = + Optional.empty(); + + @Override + public InternalTable getTable(Long version) { + try { + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getSnapshotAsOfVersion(engine, version); + return tableExtractor.table(table, snapshot, engine, tableName, basePath); + } catch (Exception e) { + throw new ReadException("Failed to get table at version " + version, e); + } + } + + @Override + public InternalTable getCurrentTable() { + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getLatestSnapshot(engine); + return getTable(snapshot.getVersion()); + } + + @Override + public InternalSnapshot getCurrentSnapshot() { + Table table_snapshot = Table.forPath(engine, basePath); + Snapshot snapshot = table_snapshot.getLatestSnapshot(engine); + InternalTable table = getTable(snapshot.getVersion()); + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles( + getInternalDataFiles(snapshot, table_snapshot, engine, table.getReadSchema())) + .sourceIdentifier(getCommitIdentifier(snapshot.getVersion())) + .build(); + } + + @Override + public TableChange getTableChangeForCommit(Long versionNumber) { + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getSnapshotAsOfVersion(engine, versionNumber); + InternalTable tableAtVersion = + tableExtractor.table(table, snapshot, engine, tableName, basePath); + Map<String, InternalDataFile> addedFiles = new HashMap<>(); + Map<String, InternalDataFile> removedFiles = new HashMap<>(); + String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); + FileFormat fileFormat = actionsConverter.convertToFileFormat(provider); + + List<RowBackedAction> actionsForVersion = getChangesState().getActionsForVersion(versionNumber); + + for (RowBackedAction action : actionsForVersion) { + if (action instanceof AddFile) { + AddFile addFile = (AddFile) action; + Map<String, String> partitionValues = VectorUtils.toJavaMap(addFile.getPartitionValues()); + InternalDataFile dataFile = + actionsConverter.convertAddActionToInternalDataFile( + addFile, + table, + fileFormat, + tableAtVersion.getPartitioningFields(), + tableAtVersion.getReadSchema().getFields(), + true, + DeltaKernelPartitionExtractor.getInstance(), + DeltaKernelStatsExtractor.getInstance(), + partitionValues); + addedFiles.put(dataFile.getPhysicalPath(), dataFile); + } else if (action instanceof RemoveFile) { + RemoveFile removeFile = (RemoveFile) action; + Map<String, String> partitionValues = + removeFile + .getPartitionValues() + .map(VectorUtils::<String, String>toJavaMap) + .orElse(Collections.emptyMap()); + InternalDataFile dataFile = + actionsConverter.convertRemoveActionToInternalDataFile( + removeFile, + table, + fileFormat, + tableAtVersion.getPartitioningFields(), + DeltaKernelPartitionExtractor.getInstance(), + partitionValues); + removedFiles.put(dataFile.getPhysicalPath(), dataFile); + } + } + + InternalFilesDiff internalFilesDiff = + InternalFilesDiff.builder() + .filesAdded(addedFiles.values()) + .filesRemoved(removedFiles.values()) + .build(); + return TableChange.builder() + .tableAsOfChange(tableAtVersion) + .filesDiff(internalFilesDiff) + .sourceIdentifier(getCommitIdentifier(versionNumber)) + .build(); + } + + @Override + public CommitsBacklog<Long> getCommitsBacklog( + InstantsForIncrementalSync instantsForIncrementalSync) { + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = + table.getSnapshotAsOfTimestamp( + engine, Timestamp.from(instantsForIncrementalSync.getLastSyncInstant()).getTime()); + + long versionNumberAtLastSyncInstant = snapshot.getVersion(); + resetState(versionNumberAtLastSyncInstant + 1, engine, table); + return CommitsBacklog.<Long>builder() + .commitsToProcess(getChangesState().getVersionsInSortedOrder()) + .build(); + } + + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + try { + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getSnapshotAsOfTimestamp(engine, Timestamp.from(instant).getTime()); + + // There is a chance earliest commit of the table is returned if the instant is before the + // earliest commit of the table, hence the additional check. + Instant deltaCommitInstant = Instant.ofEpochMilli(snapshot.getTimestamp(engine)); + return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant); + } catch (Exception e) { + log.error( + "Error checking if incremental sync is safe from " + instant + ": " + e.getMessage()); Review Comment: We'll want the full stacktrace for debugging ```suggestion "Error checking if incremental sync is safe from " + instant, e); ``` ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +// import scala.collection.Map; Review Comment: Just remove this instead of commenting it out ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelIncrementalChangesState.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import lombok.Builder; + +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import com.google.common.base.Preconditions; + +import io.delta.kernel.Table; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaLogActionUtils; +import io.delta.kernel.internal.TableImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.utils.CloseableIterator; + +/** Cache store for storing incremental table changes in the Delta table. */ +public class DeltaKernelIncrementalChangesState { + + private final Map<Long, List<RowBackedAction>> incrementalChangesByVersion = new HashMap<>(); + + /** + * Reloads the cache store with incremental changes. Intentionally thread safety is the + * responsibility of the caller. + * + * @param engine The kernel engine. + * @param versionToStartFrom The version to start from. + */ + @Builder + public DeltaKernelIncrementalChangesState( + Long versionToStartFrom, Engine engine, Table table, Long endVersion) { Review Comment: ```suggestion long versionToStartFrom, Engine engine, Table table, long endVersion) { ``` ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelIncrementalChangesState.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import lombok.Builder; + +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import com.google.common.base.Preconditions; + +import io.delta.kernel.Table; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaLogActionUtils; +import io.delta.kernel.internal.TableImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.utils.CloseableIterator; + +/** Cache store for storing incremental table changes in the Delta table. */ +public class DeltaKernelIncrementalChangesState { + + private final Map<Long, List<RowBackedAction>> incrementalChangesByVersion = new HashMap<>(); + + /** + * Reloads the cache store with incremental changes. Intentionally thread safety is the + * responsibility of the caller. + * + * @param engine The kernel engine. + * @param versionToStartFrom The version to start from. + */ + @Builder + public DeltaKernelIncrementalChangesState( + Long versionToStartFrom, Engine engine, Table table, Long endVersion) { + Set<DeltaLogActionUtils.DeltaAction> actionSet = new HashSet<>(); + actionSet.add(DeltaLogActionUtils.DeltaAction.ADD); + actionSet.add(DeltaLogActionUtils.DeltaAction.REMOVE); + TableImpl tableImpl = (TableImpl) Table.forPath(engine, table.getPath(engine)); + + // getChanges returns CloseableIterator<ColumnarBatch> + try (CloseableIterator<ColumnarBatch> iter = + tableImpl.getChanges(engine, versionToStartFrom, endVersion, actionSet)) { + while (iter.hasNext()) { + ColumnarBatch batch = iter.next(); + int addFileIndex = batch.getSchema().indexOf(DeltaLogActionUtils.DeltaAction.ADD.colName); + int removeFileIndex = + batch.getSchema().indexOf(DeltaLogActionUtils.DeltaAction.REMOVE.colName); + + try (CloseableIterator<Row> rows = batch.getRows()) { + while (rows.hasNext()) { + Row row = rows.next(); + + // Get version (first column) + long version = row.getLong(0); + List<RowBackedAction> actions = + incrementalChangesByVersion.computeIfAbsent(version, k -> new ArrayList<>()); + + if (!row.isNullAt(addFileIndex)) { + Row addFile = row.getStruct(addFileIndex); + AddFile addAction = new AddFile(addFile); + actions.add(addAction); + } + if (!row.isNullAt(removeFileIndex)) { + Row removeFile = row.getStruct(removeFileIndex); + RemoveFile removeAction = new RemoveFile(removeFile); + actions.add(removeAction); + } + } + } + } + } catch (Exception e) { + throw new RuntimeException("Error reading kernel changes", e); Review Comment: Can we catch and throw a more specific exception defined in the XTable repo here? Maybe ReadException? ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java: ########## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.io.IOException; +import java.util.*; Review Comment: Similarly, avoid`*` imports ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelTableExtractor.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.*; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class DeltaKernelTableExtractor { + @Builder.Default + private static final DeltaKernelSchemaExtractor schemaExtractor = + DeltaKernelSchemaExtractor.getInstance(); + + private final String basePath; + + public InternalTable table( + Table deltaKernelTable, Snapshot snapshot, Engine engine, String tableName, String basePath) { + try { + // Get schema from Delta Kernel's snapshot + io.delta.kernel.types.StructType schema = snapshot.getSchema(); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema); + // Get partition columns); + StructType fullSchema = snapshot.getSchema(); // The full table schema + List<String> partitionColumns = snapshot.getPartitionColumnNames(); // List<String> + List<StructField> partitionFields_strfld = Review Comment: Let's follow the java naming conventions and remove the underscore from the name. `partitionFieldSchemas` may also be a more clear name ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelTableExtractor.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.*; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class DeltaKernelTableExtractor { + @Builder.Default + private static final DeltaKernelSchemaExtractor schemaExtractor = + DeltaKernelSchemaExtractor.getInstance(); + + private final String basePath; + + public InternalTable table( + Table deltaKernelTable, Snapshot snapshot, Engine engine, String tableName, String basePath) { + try { + // Get schema from Delta Kernel's snapshot + io.delta.kernel.types.StructType schema = snapshot.getSchema(); Review Comment: Use the imported StructType instead of relying on the verbose, fully-qualified class name ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelActionsConverter.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import io.delta.kernel.Table; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.FileStats; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DeltaKernelActionsConverter { + private static final DeltaKernelActionsConverter INSTANCE = new DeltaKernelActionsConverter(); + + public static DeltaKernelActionsConverter getInstance() { + return INSTANCE; + } + + public InternalDataFile convertAddActionToInternalDataFile( + AddFile addFile, + Table table, + FileFormat fileFormat, + List<InternalPartitionField> partitionFields, + List<InternalField> fields, + boolean includeColumnStats, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelStatsExtractor fileStatsExtractor, + Map<String, String> partitionValues) { + FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields); + List<ColumnStat> columnStats = + includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList(); + long recordCount = fileStats.getNumRecords(); + + java.util.Map<String, String> scalaMap = partitionValues; + + return InternalDataFile.builder() + .physicalPath(getFullPathToFile(addFile.getPath(), table)) + .fileFormat(fileFormat) + .fileSizeBytes(addFile.getSize()) + .lastModified(addFile.getModificationTime()) + .partitionValues(partitionExtractor.partitionValueExtraction(scalaMap, partitionFields)) + .columnStats(columnStats) + .recordCount(recordCount) + .build(); + } + + public InternalDataFile convertRemoveActionToInternalDataFile( + RemoveFile removeFile, + Table table, + FileFormat fileFormat, + List<InternalPartitionField> partitionFields, + DeltaKernelPartitionExtractor partitionExtractor, + Map<String, String> partitionValues) { + java.util.Map<String, String> scalaMap = partitionValues; Review Comment: Same here, let's import Map instead of using fully qualified class names ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelTableExtractor.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.*; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class DeltaKernelTableExtractor { + @Builder.Default + private static final DeltaKernelSchemaExtractor schemaExtractor = + DeltaKernelSchemaExtractor.getInstance(); + + private final String basePath; + + public InternalTable table( + Table deltaKernelTable, Snapshot snapshot, Engine engine, String tableName, String basePath) { + try { + // Get schema from Delta Kernel's snapshot + io.delta.kernel.types.StructType schema = snapshot.getSchema(); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema); + // Get partition columns); + StructType fullSchema = snapshot.getSchema(); // The full table schema + List<String> partitionColumns = snapshot.getPartitionColumnNames(); // List<String> + List<StructField> partitionFields_strfld = + fullSchema.fields().stream() + .filter(field -> partitionColumns.contains(field.getName())) + .collect(Collectors.toList()); + StructType partitionSchema = new StructType(partitionFields_strfld); + + List<InternalPartitionField> partitionFields = + DeltaKernelPartitionExtractor.getInstance() + .convertFromDeltaPartitionFormat(internalSchema, partitionSchema); + + DataLayoutStrategy dataLayoutStrategy = + !partitionFields.isEmpty() + ? DataLayoutStrategy.HIVE_STYLE_PARTITION + : DataLayoutStrategy.FLAT; + + // Get the timestamp + long timestamp = snapshot.getTimestamp(engine); + return InternalTable.builder() + .tableFormat(TableFormat.DELTA) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(internalSchema) + .latestCommitTime(Instant.ofEpochMilli(timestamp)) + .latestMetadataPath(basePath + "/_delta_log") + .build(); + } catch (Exception e) { Review Comment: Are there more specific exceptions we can catch here? Let's also throw a more specific exception than the generic RuntimeException ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelActionsConverter.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import io.delta.kernel.Table; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.FileStats; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DeltaKernelActionsConverter { + private static final DeltaKernelActionsConverter INSTANCE = new DeltaKernelActionsConverter(); + + public static DeltaKernelActionsConverter getInstance() { + return INSTANCE; + } + + public InternalDataFile convertAddActionToInternalDataFile( + AddFile addFile, + Table table, + FileFormat fileFormat, + List<InternalPartitionField> partitionFields, + List<InternalField> fields, + boolean includeColumnStats, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelStatsExtractor fileStatsExtractor, + Map<String, String> partitionValues) { + FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields); + List<ColumnStat> columnStats = + includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList(); + long recordCount = fileStats.getNumRecords(); + + java.util.Map<String, String> scalaMap = partitionValues; Review Comment: Add an import here for Map ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +// import scala.collection.Map; +import java.util.*; Review Comment: Avoid `*` import ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +// import scala.collection.Map; +import java.util.*; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.ScanImpl; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.spi.extractor.DataFileIterator; + +/** DeltaDataFileExtractor lets the consumer iterate over partitions. */ +@Builder +public class DeltaKernelDataFileExtractor { + + @Builder.Default + private final DeltaKernelPartitionExtractor partitionExtractor = + DeltaKernelPartitionExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelStatsExtractor fileStatsExtractor = + DeltaKernelStatsExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelActionsConverter actionsConverter = + DeltaKernelActionsConverter.getInstance(); + + private final String basePath; + + /** + * Initializes an iterator for Delta Lake files. + * + * @return Delta table file iterator + */ + public DataFileIterator iterator( + Snapshot deltaSnapshot, Table table, Engine engine, InternalSchema schema) { + return new DeltaDataFileIterator(deltaSnapshot, table, engine, schema, true); + } + + public class DeltaDataFileIterator implements DataFileIterator { + private final FileFormat fileFormat; + private final List<InternalField> fields; + private final List<InternalPartitionField> partitionFields; + private Iterator<InternalDataFile> dataFilesIterator = Collections.emptyIterator(); Review Comment: It looks like this is always set below so let's make this `final` and remove the `Collections.emptyIterator()` here ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelIncrementalChangesState.java: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import lombok.Builder; + +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import com.google.common.base.Preconditions; + +import io.delta.kernel.Table; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaLogActionUtils; +import io.delta.kernel.internal.TableImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.utils.CloseableIterator; + +/** Cache store for storing incremental table changes in the Delta table. */ +public class DeltaKernelIncrementalChangesState { + + private final Map<Long, List<RowBackedAction>> incrementalChangesByVersion = new HashMap<>(); + + /** + * Reloads the cache store with incremental changes. Intentionally thread safety is the + * responsibility of the caller. + * + * @param engine The kernel engine. + * @param versionToStartFrom The version to start from. + */ + @Builder + public DeltaKernelIncrementalChangesState( + Long versionToStartFrom, Engine engine, Table table, Long endVersion) { + Set<DeltaLogActionUtils.DeltaAction> actionSet = new HashSet<>(); + actionSet.add(DeltaLogActionUtils.DeltaAction.ADD); + actionSet.add(DeltaLogActionUtils.DeltaAction.REMOVE); + TableImpl tableImpl = (TableImpl) Table.forPath(engine, table.getPath(engine)); + + // getChanges returns CloseableIterator<ColumnarBatch> + try (CloseableIterator<ColumnarBatch> iter = + tableImpl.getChanges(engine, versionToStartFrom, endVersion, actionSet)) { + while (iter.hasNext()) { + ColumnarBatch batch = iter.next(); + int addFileIndex = batch.getSchema().indexOf(DeltaLogActionUtils.DeltaAction.ADD.colName); + int removeFileIndex = + batch.getSchema().indexOf(DeltaLogActionUtils.DeltaAction.REMOVE.colName); + + try (CloseableIterator<Row> rows = batch.getRows()) { + while (rows.hasNext()) { + Row row = rows.next(); + + // Get version (first column) + long version = row.getLong(0); + List<RowBackedAction> actions = + incrementalChangesByVersion.computeIfAbsent(version, k -> new ArrayList<>()); + + if (!row.isNullAt(addFileIndex)) { + Row addFile = row.getStruct(addFileIndex); + AddFile addAction = new AddFile(addFile); + actions.add(addAction); + } + if (!row.isNullAt(removeFileIndex)) { + Row removeFile = row.getStruct(removeFileIndex); + RemoveFile removeAction = new RemoveFile(removeFile); + actions.add(removeAction); + } + } + } + } + } catch (Exception e) { + throw new RuntimeException("Error reading kernel changes", e); + } + } + + /** + * Returns the versions in sorted order. The start version is the next one after the last sync + * version to the target. The end version is the latest version in the Delta table at the time of + * initialization. + * + * @return + */ + public List<Long> getVersionsInSortedOrder() { + List<Long> versions = new ArrayList<>(incrementalChangesByVersion.keySet()); + versions.sort(Long::compareTo); + return versions; + } + + public List<RowBackedAction> getActionsForVersion(Long version) { + Preconditions.checkArgument( + incrementalChangesByVersion.containsKey(version), + String.format("Version %s not found in the DeltaIncrementalChangesState.", version)); + return incrementalChangesByVersion.get(version); + } + + private List<Tuple2<Long, List<RowBackedAction>>> getChangesList( + scala.collection.Iterator<Tuple2<Object, Seq<RowBackedAction>>> scalaIterator) { + List<Tuple2<Long, List<RowBackedAction>>> changesList = new ArrayList<>(); + Iterator<Tuple2<Object, Seq<RowBackedAction>>> javaIterator = + JavaConverters.asJavaIteratorConverter(scalaIterator).asJava(); + while (javaIterator.hasNext()) { + Tuple2<Object, Seq<RowBackedAction>> currentChange = javaIterator.next(); + changesList.add( + new Tuple2<>( + (Long) currentChange._1(), + JavaConverters.seqAsJavaListConverter(currentChange._2()).asJava())); + } + return changesList; + } Review Comment: This code is showing up as unused, can it be removed? ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +// import scala.collection.Map; +import java.util.*; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.ScanImpl; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.spi.extractor.DataFileIterator; + +/** DeltaDataFileExtractor lets the consumer iterate over partitions. */ +@Builder +public class DeltaKernelDataFileExtractor { + + @Builder.Default + private final DeltaKernelPartitionExtractor partitionExtractor = + DeltaKernelPartitionExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelStatsExtractor fileStatsExtractor = + DeltaKernelStatsExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelActionsConverter actionsConverter = + DeltaKernelActionsConverter.getInstance(); + + private final String basePath; + + /** + * Initializes an iterator for Delta Lake files. + * + * @return Delta table file iterator + */ + public DataFileIterator iterator( + Snapshot deltaSnapshot, Table table, Engine engine, InternalSchema schema) { + return new DeltaDataFileIterator(deltaSnapshot, table, engine, schema, true); + } + + public class DeltaDataFileIterator implements DataFileIterator { + private final FileFormat fileFormat; + private final List<InternalField> fields; + private final List<InternalPartitionField> partitionFields; + private Iterator<InternalDataFile> dataFilesIterator = Collections.emptyIterator(); + + private DeltaDataFileIterator( + Snapshot snapshot, + Table table, + Engine engine, + InternalSchema schema, + boolean includeColumnStats) { + String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); + this.fileFormat = actionsConverter.convertToFileFormat(provider); + + this.fields = schema.getFields(); + + StructType fullSchema = snapshot.getSchema(); // The full table schema + List<String> partitionColumns = snapshot.getPartitionColumnNames(); + + List<StructField> partitionFieldsStr = + fullSchema.fields().stream() + .filter(field -> partitionColumns.contains(field.getName())) + .collect(Collectors.toList()); + + StructType partitionSchema = new StructType(partitionFieldsStr); + + this.partitionFields = + partitionExtractor.convertFromDeltaPartitionFormat(schema, partitionSchema); + + ScanImpl myScan = (ScanImpl) snapshot.getScanBuilder().build(); + CloseableIterator<FilteredColumnarBatch> scanFiles = + myScan.getScanFiles(engine, includeColumnStats); + + List<InternalDataFile> dataFiles = new ArrayList<>(); + while (scanFiles.hasNext()) { Review Comment: This is eagerly pulling all the files into memory instead of using the iterator pattern. Can the code be updated to iterate through these values when `next` is called instead of eagerly materializing them? ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelTableExtractor.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.Builder; + +import io.delta.kernel.*; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.TableFormat; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class DeltaKernelTableExtractor { + @Builder.Default + private static final DeltaKernelSchemaExtractor schemaExtractor = + DeltaKernelSchemaExtractor.getInstance(); + + private final String basePath; + + public InternalTable table( + Table deltaKernelTable, Snapshot snapshot, Engine engine, String tableName, String basePath) { + try { + // Get schema from Delta Kernel's snapshot + io.delta.kernel.types.StructType schema = snapshot.getSchema(); + InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema); + // Get partition columns); Review Comment: ```suggestion // Get partition columns ``` ########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelPartitionExtractor.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.apache.xtable.collectors.CustomCollectors.toList; +import static org.apache.xtable.delta.DeltaValueConverter.convertFromDeltaPartitionValue; +import static org.apache.xtable.delta.DeltaValueConverter.convertToDeltaPartitionValue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import scala.collection.JavaConverters; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +import io.delta.kernel.types.*; +import io.delta.kernel.types.FieldMetadata; + +import org.apache.xtable.exception.PartitionSpecException; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.schema.SchemaFieldFinder; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DeltaKernelPartitionExtractor { + private static final DeltaKernelPartitionExtractor INSTANCE = new DeltaKernelPartitionExtractor(); + private static final String CAST_FUNCTION = "CAST(%s as DATE)"; + private static final String DATE_FORMAT_FUNCTION = "DATE_FORMAT(%s, '%s')"; + private static final String YEAR_FUNCTION = "YEAR(%s)"; + private static final String DATE_FORMAT_FOR_HOUR = "yyyy-MM-dd-HH"; + private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd"; + private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM"; + private static final String DATE_FORMAT_FOR_YEAR = "yyyy"; + private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)"; + // For timestamp partition fields, actual partition column names in delta format will be of type + // generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`. + private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s"; + static final String DELTA_GENERATION_EXPRESSION = "delta.generationExpression"; + private static final List<ParsedGeneratedExpr.GeneratedExprType> GRANULARITIES = + Arrays.asList( + ParsedGeneratedExpr.GeneratedExprType.YEAR, + ParsedGeneratedExpr.GeneratedExprType.MONTH, + ParsedGeneratedExpr.GeneratedExprType.DAY, + ParsedGeneratedExpr.GeneratedExprType.HOUR); + + public static DeltaKernelPartitionExtractor getInstance() { + return INSTANCE; + } + + /** + * Extracts partition fields from delta table. Partitioning by nested columns isn't supported. + * Example: Given a delta table and a reference to DeltaLog, method parameters can be obtained by + * deltaLog = DeltaLog.forTable(spark, deltaTablePath); InternalSchema internalSchema = + * DeltaSchemaExtractor.getInstance().toInternalSchema(deltaLog.snapshot().schema()); StructType + * partitionSchema = deltaLog.metadata().partitionSchema(); + * + * @param internalSchema canonical representation of the schema. + * @param partitionSchema partition schema of the delta table. + * @return list of canonical representation of the partition fields + */ + public List<InternalPartitionField> convertFromDeltaPartitionFormat( + InternalSchema internalSchema, StructType partitionSchema) { + if (partitionSchema.fields().size() == 0) { + return Collections.emptyList(); + } + return getInternalPartitionFields(partitionSchema, internalSchema); + } + + /** + * If all of them are value process individually and return. If they contain month they should + * contain year as well. If they contain day they should contain month and year as well. If they + * contain hour they should contain day, month and year as well. Other supports CAST(col as DATE) + * and DATE_FORMAT(col, 'yyyy-MM-dd'). Partition by nested fields may not be fully supported. + */ + private List<InternalPartitionField> getInternalPartitionFields( + StructType partitionSchema, InternalSchema internalSchema) { + PeekingIterator<StructField> itr = + Iterators.peekingIterator(partitionSchema.fields().iterator()); + List<InternalPartitionField> partitionFields = new ArrayList<>(partitionSchema.fields().size()); + while (itr.hasNext()) { + StructField currPartitionField = itr.peek(); + if (!currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) { + partitionFields.add( + InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, currPartitionField.getName())) + .transformType(PartitionTransformType.VALUE) + .build()); + itr.next(); // consume the field. + } else { + // Partition contains generated expression. + // if it starts with year we should consume until we hit field with no generated expression + // or we hit a field with generated expression that is of cast or date format. + String expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION); + ParsedGeneratedExpr parsedGeneratedExpr = + ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr); + if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType) { + partitionFields.add( + getPartitionWithDateTransform( + currPartitionField.getName(), parsedGeneratedExpr, internalSchema)); + itr.next(); // consume the field. + } else if (ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT + == parsedGeneratedExpr.generatedExprType) { + partitionFields.add( + getPartitionWithDateFormatTransform( + currPartitionField.getName(), parsedGeneratedExpr, internalSchema)); + itr.next(); // consume the field. + } else { + // consume until we hit field with no generated expression or generated expression + // that is not of type cast or date format. + List<ParsedGeneratedExpr> parsedGeneratedExprs = new ArrayList<>(); + while (itr.hasNext() + && currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) { + expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION); + parsedGeneratedExpr = + ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr); + + if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType + || ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT + == parsedGeneratedExpr.generatedExprType) { + break; + } + parsedGeneratedExprs.add(parsedGeneratedExpr); + itr.next(); // consume the field + if (itr.hasNext()) { + currPartitionField = itr.peek(); + } + } + partitionFields.add( + getPartitionColumnsForHourOrDayOrMonthOrYear(parsedGeneratedExprs, internalSchema)); + } + } + } + return partitionFields; + } + + private InternalPartitionField getPartitionColumnsForHourOrDayOrMonthOrYear( + List<ParsedGeneratedExpr> parsedGeneratedExprs, InternalSchema internalSchema) { + if (parsedGeneratedExprs.size() > 4) { + throw new IllegalStateException("Invalid partition transform"); + } + validate( + parsedGeneratedExprs, new HashSet<>(GRANULARITIES.subList(0, parsedGeneratedExprs.size()))); + + ParsedGeneratedExpr transform = parsedGeneratedExprs.get(0); + List<String> partitionColumns = + parsedGeneratedExprs.stream() + .map(parsedGeneratedExpr -> parsedGeneratedExpr.partitionColumnName) + .collect(toList(parsedGeneratedExprs.size())); + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance().findFieldByPath(internalSchema, transform.sourceColumn)) + .partitionFieldNames(partitionColumns) + .transformType( + parsedGeneratedExprs.get(parsedGeneratedExprs.size() - 1) + .internalPartitionTransformType) + .build(); + } + + // Cast has default format of yyyy-MM-dd. + private InternalPartitionField getPartitionWithDateTransform( + String partitionColumnName, + ParsedGeneratedExpr parsedGeneratedExpr, + InternalSchema internalSchema) { + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn)) + .partitionFieldNames(Collections.singletonList(partitionColumnName)) + .transformType(PartitionTransformType.DAY) + .build(); + } + + private InternalPartitionField getPartitionWithDateFormatTransform( + String partitionColumnName, + ParsedGeneratedExpr parsedGeneratedExpr, + InternalSchema internalSchema) { + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn)) + .partitionFieldNames(Collections.singletonList(partitionColumnName)) + .transformType(parsedGeneratedExpr.internalPartitionTransformType) + .build(); + } + + public Map<String, StructField> convertToDeltaPartitionFormat( + List<InternalPartitionField> partitionFields) { + if (partitionFields == null) { + return null; + } + Map<String, StructField> nameToStructFieldMap = new HashMap<>(); + for (InternalPartitionField internalPartitionField : partitionFields) { + String currPartitionColumnName; + StructField field; + + if (internalPartitionField.getTransformType() == PartitionTransformType.VALUE) { + currPartitionColumnName = internalPartitionField.getSourceField().getName(); + field = null; + } else { + // Since partition field of timestamp or bucket type, create new field in schema. + field = getGeneratedField(internalPartitionField); + currPartitionColumnName = field.getName(); + } + nameToStructFieldMap.put(currPartitionColumnName, field); + } + return nameToStructFieldMap; + } + + public Map<String, String> partitionValueSerialization(InternalDataFile internalDataFile) { + Map<String, String> partitionValuesSerialized = new HashMap<>(); + if (internalDataFile.getPartitionValues() == null + || internalDataFile.getPartitionValues().isEmpty()) { + return partitionValuesSerialized; + } + for (PartitionValue partitionValue : internalDataFile.getPartitionValues()) { + InternalPartitionField partitionField = partitionValue.getPartitionField(); + PartitionTransformType transformType = partitionField.getTransformType(); + String partitionValueSerialized; + if (transformType == PartitionTransformType.VALUE) { + partitionValueSerialized = + convertToDeltaPartitionValue( + partitionValue.getRange().getMaxValue(), + partitionField.getSourceField().getSchema().getDataType(), + transformType, + ""); + partitionValuesSerialized.put( + partitionField.getSourceField().getName(), partitionValueSerialized); + } else if (transformType == PartitionTransformType.BUCKET) { + partitionValueSerialized = partitionValue.getRange().getMaxValue().toString(); + partitionValuesSerialized.put( + getGeneratedColumnName(partitionField), partitionValueSerialized); + } else { + // use appropriate date formatter for value serialization. + partitionValueSerialized = + convertToDeltaPartitionValue( + partitionValue.getRange().getMaxValue(), + partitionField.getSourceField().getSchema().getDataType(), + transformType, + getDateFormat(partitionField.getTransformType())); + partitionValuesSerialized.put( + getGeneratedColumnName(partitionField), partitionValueSerialized); + } + } + return partitionValuesSerialized; + } + + public List<PartitionValue> partitionValueExtraction( + java.util.Map<String, String> values, List<InternalPartitionField> partitionFields) { Review Comment: Let's add an import for Map in this class as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
