nsivabalan commented on code in PR #5958: URL: https://github.com/apache/hudi/pull/5958#discussion_r985130392
########## hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java: ########## @@ -234,6 +234,13 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "metadata table which are never added before. This config determines how to handle " + "such spurious deletes"); + public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty + .key(METADATA_PREFIX + ".log.record.reader.use.scanV2") + .defaultValue(false) + .sinceVersion("0.10.10") Review Comment: 0.13.0 ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java: ########## @@ -50,29 +49,35 @@ private final HoodieCompactor compactor; private final HoodieCompactionHandler compactionHandler; + private WriteOperationType operationType; public RunCompactionActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, HoodieCompactor compactor, - HoodieCompactionHandler compactionHandler) { + HoodieCompactionHandler compactionHandler, + WriteOperationType operationType) { super(context, config, table, instantTime); this.compactor = compactor; this.compactionHandler = compactionHandler; + this.operationType = operationType; Review Comment: can we assert that type can be either COMPACT or LOG_COMACT. bcoz, in other methods below, in else blocks we don't check for type. so it could be anything ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionStrategy.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieCompactionHandler; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +public class CompactionExecutionStrategy<T extends HoodieRecordPayload, I, K, O> implements Serializable { + + protected void transitionRequestedToInflight(HoodieTable table, String compactionInstantTime) { + HoodieActiveTimeline timeline = table.getActiveTimeline(); + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + // Mark instant as compaction inflight + timeline.transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + } + + protected String instantTimeToUseForScanning(String compactionInstantTime, String maxInstantTime) { + return maxInstantTime; + } + + protected boolean shouldPreserveCommitMetadata() { + return false; Review Comment: this is not right. we should fetch this from CompactionConfig. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java: ########## @@ -379,11 +407,16 @@ public void doAppend() { flushToDiskIfRequired(record); writeToBuffer(record); } - appendDataAndDeleteBlocks(header); + appendDataAndDeleteBlocks(header, true); estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; } - protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header) { + /** + * Appends data and delete blocks. When appendDeleteBlocks value is false, only data blocks are appended. + * This is done so that all the data blocks are created first and then a single delete block is added. + * Otherwise what can end up happening is creation of multiple small delete blocks get added after each data block. Review Comment: gotcha. can you remind me on which case the deleteBlock will be non-empty while writing the output for log compaction? bcoz, we would have removed records to be deleted from the hashmap. this is akin to our regular compaction right. we take base files + log blocks (of which some could be delete blocks as well) and write out a new base file. so in this we will write out only valid records and there is no necessity for a delete block to be added. w/ log compaction, I would expect there won't be any delete blocks written. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java: ########## @@ -240,8 +245,9 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions( int keepVersions = config.getCleanerFileVersionsRetained(); // do not cleanup slice required for pending compaction Iterator<FileSlice> fileSliceIterator = - fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator(); - if (isFileGroupInPendingCompaction(fileGroup)) { + fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs) + && !isFileSliceNeededForPendingLogCompaction(fs)).iterator(); + if (isFileGroupInPendingCompaction(fileGroup) || isFileGroupInPendingLogCompaction(fileGroup)) { Review Comment: sorry, I don't see this being addressed? ``` if (isFileGroupInPendingCompaction(fileGroup)) { // We have already saved the last version of file-groups for pending compaction Id keepVersions--; } ``` Infact, you removed isFileGroupInPendingLogCompaction from the if condition and missed to fix the other impl. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact.plan.generators; + +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toList; + +public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPayload, I, K, O> implements Serializable { + private static final Logger LOG = LogManager.getLogger(BaseHoodieCompactionPlanGenerator.class); + + protected final HoodieTable<T, I, K, O> hoodieTable; + protected final HoodieWriteConfig writeConfig; + protected final transient HoodieEngineContext engineContext; + + public BaseHoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.hoodieTable = table; + this.writeConfig = writeConfig; + this.engineContext = engineContext; + } + + public HoodieCompactionPlan generateCompactionPlan() throws IOException { + // Accumulator to keep track of total log files for a table + HoodieAccumulator totalLogFiles = engineContext.newAccumulator(); + // Accumulator to keep track of total log file slices for a table + HoodieAccumulator totalFileSlices = engineContext.newAccumulator(); + + // TODO : check if maxMemory is not greater than JVM or executor memory + // TODO - rollback any compactions in flight + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, writeConfig.getMetadataConfig(), metaClient.getBasePath()); + + // filter the partition paths if needed to reduce list status + partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths); + + if (partitionPaths.isEmpty()) { + // In case no partitions could be picked, return no compaction plan + return null; + } + LOG.info("Looking for files to compact in " + partitionPaths + " partitions"); + engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + writeConfig.getTableName()); + + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) this.hoodieTable.getSliceView(); + Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + + // Exclude files in pending clustering from compaction. + fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); + + if (filterLogCompactionOperations()) { + fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toList())); + } + + String lastCompletedInstantTime = hoodieTable.getMetaClient() + .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, + HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + + List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath) + .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering)) + .map(s -> { + List<HoodieLogFile> logFiles = + s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList()); + totalLogFiles.add(logFiles.size()); + totalFileSlices.add(1L); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for Map operations and collecting them finally in Avro generated classes for storing + // into meta files.6 + Option<HoodieBaseFile> dataFile = s.getBaseFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + writeConfig.getCompactionStrategy().captureMetrics(writeConfig, s)); + }), partitionPaths.size()).stream() + .map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); + + LOG.info("Total of " + operations.size() + " compaction operations are retrieved"); + LOG.info("Total number of latest files slices " + totalFileSlices.value()); + LOG.info("Total number of log files " + totalLogFiles.value()); + LOG.info("Total number of file slices " + totalFileSlices.value()); + + if (operations.isEmpty()) { + LOG.warn("No operations are retrieved for " + metaClient.getBasePath()); + return null; + } + + // Filter the compactions with the passed in filter. This lets us choose most effective compactions only + HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, operations); + ValidationUtils.checkArgument( + compactionPlan.getOperations().stream().noneMatch( + op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), + "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " + + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering + + ", Selected workload :" + compactionPlan); + if (compactionPlan.getOperations().isEmpty()) { + LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); + } + return compactionPlan; + } + + protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations); + + protected abstract boolean filterLogCompactionOperations(); + + protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig writeConfig, List<String> partitionPaths) { + return partitionPaths; + } + + protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, Set<HoodieFileGroupId> pendingFileGroupIds) { + return fileSlice.getLogFiles().count() > 0 && !pendingFileGroupIds.contains(fileSlice.getFileGroupId()); Review Comment: preivosuly the filter condition was ``` !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()) ``` looks like we are adding logFileCount> 0 additionaly (for major compaction) in this patch? can we revert that? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionStrategy.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieCompactionHandler; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +public class CompactionExecutionStrategy<T extends HoodieRecordPayload, I, K, O> implements Serializable { + + protected void transitionRequestedToInflight(HoodieTable table, String compactionInstantTime) { + HoodieActiveTimeline timeline = table.getActiveTimeline(); + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + // Mark instant as compaction inflight + timeline.transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + } + + protected String instantTimeToUseForScanning(String compactionInstantTime, String maxInstantTime) { + return maxInstantTime; + } + + protected boolean shouldPreserveCommitMetadata() { + return false; Review Comment: infact we don't need this. preserveCommitMetadata is fetched from writeConfig from within HoodieMergeHandle. ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java: ########## @@ -156,7 +159,14 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { HoodieOperation operation = hoodieRecord.getOperation(); - records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation)); + HoodieRecord latestHoodieRecord = new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation); + // If preserveMetadata is true. Use the same location. + if (this.preserveCommitMetadata) { Review Comment: there could be some gaps here. for eg, lets say this is set to true. only those records that got updated are getting new metadata. those that are not touched (L173) still holds old metadata. But trying to think if we really need this. why can't we rewrite the filename alone and leave other fields untouched. for both major compaction and minor compaction. ########## hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java: ########## @@ -234,6 +234,13 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "metadata table which are never added before. This config determines how to handle " + "such spurious deletes"); + public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty + .key(METADATA_PREFIX + ".log.record.reader.use.scanV2") + .defaultValue(false) + .sinceVersion("0.10.10") + .withDocumentation("There are cases when extra files are requested to be deleted from metadata table which was never added before. This config" Review Comment: fix documentation ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java: ########## @@ -456,6 +491,23 @@ public List<WriteStatus> close() { } } + public void write(Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) { + Iterator<String> keyIterator = recordMap.keySet().stream().iterator(); + try { + while (keyIterator.hasNext()) { + final String key = keyIterator.next(); + HoodieRecord<T> record = (HoodieRecord<T>) recordMap.get(key); + init(record); + // For logCompaction operations all the records are read and written as a huge block. Review Comment: I have left a comment elsewhere. I am bit confused here. lets discuss this f2f. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionStrategy.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieCompactionHandler; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +public class CompactionExecutionStrategy<T extends HoodieRecordPayload, I, K, O> implements Serializable { + + protected void transitionRequestedToInflight(HoodieTable table, String compactionInstantTime) { + HoodieActiveTimeline timeline = table.getActiveTimeline(); + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + // Mark instant as compaction inflight + timeline.transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + } + + protected String instantTimeToUseForScanning(String compactionInstantTime, String maxInstantTime) { + return maxInstantTime; + } + + protected boolean shouldPreserveCommitMetadata() { + return false; Review Comment: ah, I see. this is used just for logCompaction and not for major compaction. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.compact.plan.generators; + +import org.apache.hudi.avro.model.HoodieCompactionOperation; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toList; + +public abstract class BaseHoodieCompactionPlanGenerator<T extends HoodieRecordPayload, I, K, O> implements Serializable { + private static final Logger LOG = LogManager.getLogger(BaseHoodieCompactionPlanGenerator.class); + + protected final HoodieTable<T, I, K, O> hoodieTable; + protected final HoodieWriteConfig writeConfig; + protected final transient HoodieEngineContext engineContext; + + public BaseHoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.hoodieTable = table; + this.writeConfig = writeConfig; + this.engineContext = engineContext; + } + + public HoodieCompactionPlan generateCompactionPlan() throws IOException { + // Accumulator to keep track of total log files for a table + HoodieAccumulator totalLogFiles = engineContext.newAccumulator(); + // Accumulator to keep track of total log file slices for a table + HoodieAccumulator totalFileSlices = engineContext.newAccumulator(); + + // TODO : check if maxMemory is not greater than JVM or executor memory + // TODO - rollback any compactions in flight + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, writeConfig.getMetadataConfig(), metaClient.getBasePath()); + + // filter the partition paths if needed to reduce list status + partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths); + + if (partitionPaths.isEmpty()) { + // In case no partitions could be picked, return no compaction plan + return null; + } + LOG.info("Looking for files to compact in " + partitionPaths + " partitions"); + engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + writeConfig.getTableName()); + + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) this.hoodieTable.getSliceView(); + Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + + // Exclude files in pending clustering from compaction. + fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); + + if (filterLogCompactionOperations()) { + fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toList())); + } + + String lastCompletedInstantTime = hoodieTable.getMetaClient() + .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, + HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + + List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath) + .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering)) + .map(s -> { + List<HoodieLogFile> logFiles = + s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList()); + totalLogFiles.add(logFiles.size()); + totalFileSlices.add(1L); + // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO + // for Map operations and collecting them finally in Avro generated classes for storing + // into meta files.6 + Option<HoodieBaseFile> dataFile = s.getBaseFile(); + return new CompactionOperation(dataFile, partitionPath, logFiles, + writeConfig.getCompactionStrategy().captureMetrics(writeConfig, s)); + }), partitionPaths.size()).stream() + .map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); Review Comment: in master, we have this additional filter ``` .filter(c -> !c.getDeltaFileNames().isEmpty()), partitionPaths.size()).stream() ``` may I know where do we have this in this patch? ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java: ########## @@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> keySpecOpt) { } } + private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) { + currentInstantLogBlocks = new ArrayDeque<>(); + progress = 0.0f; + totalLogFiles = new AtomicLong(0); + totalRollbacks = new AtomicLong(0); + totalCorruptBlocks = new AtomicLong(0); + totalLogBlocks = new AtomicLong(0); + totalLogRecords = new AtomicLong(0); + HoodieLogFormatReader logFormatReaderWrapper = null; + HoodieTimeline commitsTimeline = this.hoodieTableMetaClient.getCommitsTimeline(); + HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); + HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); + try { + + // Get the key field based on populate meta fields config + // and the table type + final String keyField = getKeyField(); + + boolean enableRecordLookups = !forceFullScan; + // Iterate over the paths + logFormatReaderWrapper = new HoodieLogFormatReader(fs, + logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), + readerSchema, readBlocksLazily, reverseReader, bufferSize, enableRecordLookups, keyField, internalSchema); + + /** + * Scanning log blocks and placing the compacted blocks at the right place require two traversals. + * First traversal to identify the rollback blocks and valid data and compacted blocks. + * + * Scanning blocks is easy to do in single writer mode, where the rollback block is right after the effected data blocks. + * With multiwriter mode the blocks can be out of sync. An example scenario. + * B1, B2, B3, B4, R1(B3), B5 + * In this case, rollback block R1 is invalidating the B3 which is not the previous block. + * This becomes more complicated if we have compacted blocks, which are data blocks created using log compaction. + * + * To solve this, run a single traversal, collect all the valid blocks that are not corrupted + * along with the block instant times and rollback block's target instant times. + * + * As part of second traversal iterate block instant times in reverse order. + * While iterating in reverse order keep a track of final compacted instant times for each block. + * In doing so, when a data block is seen include the final compacted block if it is not already added. + * + * find the final compacted block which contains the merged contents. + * For example B1 and B2 are merged and created a compacted block called M1 and now M1, B3 and B4 are merged and + * created another compacted block called M2. So, now M2 is the final block which contains all the changes of B1,B2,B3,B4. + * So, blockTimeToCompactionBlockTimeMap will look like + * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2) + * This map is updated while iterating and is used to place the compacted blocks in the correct position. + * This way we can have multiple layers of merge blocks and still be able to find the correct positions of merged blocks. + */ + + // Collect targetRollbackInstants, using which we can determine which blocks are invalid. + Set<String> targetRollbackInstants = new HashSet<>(); + + // This holds block instant time to list of blocks. Note here the log blocks can be normal data blocks or compacted log blocks. + Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>(); + + // Order of Instants. + List<String> orderedInstantsList = new ArrayList<>(); + + Set<HoodieLogFile> scannedLogFiles = new HashSet<>(); + + /* + * 1. First step to traverse in forward direction. While traversing the log blocks collect following, + * a. instant times + * b. instant to logblocks map. + * c. targetRollbackInstants. + */ + while (logFormatReaderWrapper.hasNext()) { + HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); + LOG.info("Scanning log file " + logFile); + scannedLogFiles.add(logFile); + totalLogFiles.set(scannedLogFiles.size()); + // Use the HoodieLogFileReader to iterate through the blocks in the log file + HoodieLogBlock logBlock = logFormatReaderWrapper.next(); + final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); + totalLogBlocks.incrementAndGet(); + // Ignore the corrupt blocks. No further handling is required for them. + if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) { + LOG.info("Found a corrupt block in " + logFile.getPath()); + totalCorruptBlocks.incrementAndGet(); + continue; + } + if (!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), + HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) { + // hit a block with instant time greater than should be processed, stop processing further + break; + } + if (logBlock.getBlockType() != COMMAND_BLOCK) { + if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) + || inflightInstantsTimeline.containsInstant(instantTime)) { + // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one + continue; + } + if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) { + // filter the log block by instant range + continue; + } + } + + switch (logBlock.getBlockType()) { + case HFILE_DATA_BLOCK: + case AVRO_DATA_BLOCK: + case DELETE_BLOCK: + List<HoodieLogBlock> logBlocksList = instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>()); + if (logBlocksList.size() == 0) { + // Keep a track of instant Times in the order of arrival. + orderedInstantsList.add(instantTime); + } + logBlocksList.add(logBlock); + instantToBlocksMap.put(instantTime, logBlocksList); + break; + case COMMAND_BLOCK: + LOG.info("Reading a command block from file " + logFile.getPath()); + // This is a command block - take appropriate action based on the command + HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock; + + // Rollback blocks contain information of instants that are failed, collect them in a set.. + if (commandBlock.getType().equals(ROLLBACK_BLOCK)) { + totalRollbacks.incrementAndGet(); + String targetInstantForCommandBlock = + logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME); + targetRollbackInstants.add(targetInstantForCommandBlock); + } else { + throw new UnsupportedOperationException("Command type not yet supported."); + } + break; + default: + throw new UnsupportedOperationException("Block type not yet supported."); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Ordered instant times seen " + orderedInstantsList); + } + + int numBlocksRolledBack = 0; + + // All the block's instants time that are added to the queue are collected in this set. + Set<String> instantTimesIncluded = new HashSet<>(); + + // Key will have details related to instant time and value will be empty if that instant is not compacted. + // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 -> i3. + Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>(); + + /* + * 2. Iterate the instants list in reverse order to get the latest instants first. + * While iterating update the blockTimeToCompactionBlockTimesMap and include the compacted blocks in right position. + */ + for (int i = orderedInstantsList.size() - 1; i >= 0; i--) { + String instantTime = orderedInstantsList.get(i); + + // Exclude the blocks which are included in targetRollbackInstants set. + // Here, rollback can include instants affiliated to deltacommits or log compaction commits. + if (targetRollbackInstants.contains(instantTime)) { + numBlocksRolledBack += instantToBlocksMap.get(instantTime).size(); + continue; + } + List<HoodieLogBlock> instantsBlocks = instantToBlocksMap.get(instantTime); + if (instantsBlocks.size() == 0) { + throw new HoodieException("Data corrupted while writing. Found zero blocks for an instant " + instantTime); + } + HoodieLogBlock firstBlock = instantsBlocks.get(0); + + // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under its headers. + if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) { + // When compacted blocks are seen update the blockTimeToCompactionBlockTimeMap. + Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(",")) + .forEach(originalInstant -> { + String finalInstant = blockTimeToCompactionBlockTimeMap.get(instantTime); + blockTimeToCompactionBlockTimeMap.put(originalInstant, finalInstant); + }); + } else { + // When a data block is found check if it is already compacted. + String compactedFinalInstantTime = blockTimeToCompactionBlockTimeMap.get(instantTime); + if (compactedFinalInstantTime == null) { + // If it is not compacted then add the blocks related to the instant time at the end of the queue and continue. + instantToBlocksMap.get(instantTime).forEach(block -> currentInstantLogBlocks.addLast(block)); Review Comment: is this addressed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org