nsivabalan commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r985130392


##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -234,6 +234,13 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "metadata table which are never added before. This config 
determines how to handle "
           + "such spurious deletes");
 
+  public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = 
ConfigProperty
+      .key(METADATA_PREFIX + ".log.record.reader.use.scanV2")
+      .defaultValue(false)
+      .sinceVersion("0.10.10")

Review Comment:
   0.13.0



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java:
##########
@@ -50,29 +49,35 @@
 
   private final HoodieCompactor compactor;
   private final HoodieCompactionHandler compactionHandler;
+  private WriteOperationType operationType;
 
   public RunCompactionActionExecutor(HoodieEngineContext context,
                                      HoodieWriteConfig config,
                                      HoodieTable table,
                                      String instantTime,
                                      HoodieCompactor compactor,
-                                     HoodieCompactionHandler 
compactionHandler) {
+                                     HoodieCompactionHandler compactionHandler,
+                                     WriteOperationType operationType) {
     super(context, config, table, instantTime);
     this.compactor = compactor;
     this.compactionHandler = compactionHandler;
+    this.operationType = operationType;

Review Comment:
   can we assert that type can be either COMPACT or LOG_COMACT. bcoz, in other 
methods below, in else blocks we don't check for type. so it could be anything



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionStrategy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+public class CompactionExecutionStrategy<T extends HoodieRecordPayload, I, K, 
O> implements Serializable {
+
+  protected void transitionRequestedToInflight(HoodieTable table, String 
compactionInstantTime) {
+    HoodieActiveTimeline timeline = table.getActiveTimeline();
+    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+    // Mark instant as compaction inflight
+    timeline.transitionCompactionRequestedToInflight(instant);
+    table.getMetaClient().reloadActiveTimeline();
+  }
+
+  protected String instantTimeToUseForScanning(String compactionInstantTime, 
String maxInstantTime) {
+    return maxInstantTime;
+  }
+
+  protected boolean shouldPreserveCommitMetadata() {
+    return false;

Review Comment:
   this is not right. we should fetch this from CompactionConfig. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -379,11 +407,16 @@ public void doAppend() {
       flushToDiskIfRequired(record);
       writeToBuffer(record);
     }
-    appendDataAndDeleteBlocks(header);
+    appendDataAndDeleteBlocks(header, true);
     estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
   }
 
-  protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> 
header) {
+  /**
+   * Appends data and delete blocks. When appendDeleteBlocks value is false, 
only data blocks are appended.
+   * This is done so that all the data blocks are created first and then a 
single delete block is added.
+   * Otherwise what can end up happening is creation of multiple small delete 
blocks get added after each data block.

Review Comment:
   gotcha. can you remind me on which case the deleteBlock will be non-empty 
while writing the output for log compaction? bcoz, we would have removed 
records to be deleted from the hashmap. this is akin to our regular compaction 
right. we take base files + log blocks (of which some could be delete blocks as 
well) and write out a new base file. so in this we will write out only valid 
records and there is no necessity for a delete block to be added. 
   
   w/ log compaction, I would expect there won't be any delete blocks written. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -240,8 +245,9 @@ private Pair<Boolean, List<CleanFileInfo>> 
getFilesToCleanKeepingLatestVersions(
       int keepVersions = config.getCleanerFileVersionsRetained();
       // do not cleanup slice required for pending compaction
       Iterator<FileSlice> fileSliceIterator =
-          fileGroup.getAllFileSlices().filter(fs -> 
!isFileSliceNeededForPendingCompaction(fs)).iterator();
-      if (isFileGroupInPendingCompaction(fileGroup)) {
+          fileGroup.getAllFileSlices().filter(fs -> 
!isFileSliceNeededForPendingCompaction(fs)
+              && !isFileSliceNeededForPendingLogCompaction(fs)).iterator();
+      if (isFileGroupInPendingCompaction(fileGroup) || 
isFileGroupInPendingLogCompaction(fileGroup)) {

Review Comment:
   sorry, I don't see this being addressed? 
   ```
         if (isFileGroupInPendingCompaction(fileGroup)) {
           // We have already saved the last version of file-groups for pending 
compaction Id
           keepVersions--;
         }
   ```
   
   Infact, you removed isFileGroupInPendingLogCompaction from the if condition 
and missed to fix the other impl.
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.plan.generators;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+public abstract class BaseHoodieCompactionPlanGenerator<T extends 
HoodieRecordPayload, I, K, O> implements Serializable {
+  private static final Logger LOG = 
LogManager.getLogger(BaseHoodieCompactionPlanGenerator.class);
+
+  protected final HoodieTable<T, I, K, O> hoodieTable;
+  protected final HoodieWriteConfig writeConfig;
+  protected final transient HoodieEngineContext engineContext;
+
+  public BaseHoodieCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+    this.hoodieTable = table;
+    this.writeConfig = writeConfig;
+    this.engineContext = engineContext;
+  }
+
+  public HoodieCompactionPlan generateCompactionPlan() throws IOException {
+    // Accumulator to keep track of total log files for a table
+    HoodieAccumulator totalLogFiles = engineContext.newAccumulator();
+    // Accumulator to keep track of total log file slices for a table
+    HoodieAccumulator totalFileSlices = engineContext.newAccumulator();
+
+    // TODO : check if maxMemory is not greater than JVM or executor memory
+    // TODO - rollback any compactions in flight
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, 
writeConfig.getMetadataConfig(), metaClient.getBasePath());
+
+    // filter the partition paths if needed to reduce list status
+    partitionPaths = filterPartitionPathsByStrategy(writeConfig, 
partitionPaths);
+
+    if (partitionPaths.isEmpty()) {
+      // In case no partitions could be picked, return no compaction plan
+      return null;
+    }
+    LOG.info("Looking for files to compact in " + partitionPaths + " 
partitions");
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for 
files to compact: " + writeConfig.getTableName());
+
+    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
this.hoodieTable.getSliceView();
+    Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+        .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+        .collect(Collectors.toSet());
+
+    // Exclude files in pending clustering from compaction.
+    
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+
+    if (filterLogCompactionOperations()) {
+      
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
+          .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+          .collect(Collectors.toList()));
+    }
+
+    String lastCompletedInstantTime = hoodieTable.getMetaClient()
+        
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+            HoodieTimeline.ROLLBACK_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+    List<HoodieCompactionOperation> operations = 
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
+        .getLatestFileSlices(partitionPath)
+        .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, 
fgIdsInPendingCompactionAndClustering))
+        .map(s -> {
+          List<HoodieLogFile> logFiles =
+              
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
+          totalLogFiles.add(logFiles.size());
+          totalFileSlices.add(1L);
+          // Avro generated classes are not inheriting Serializable. Using 
CompactionOperation POJO
+          // for Map operations and collecting them finally in Avro generated 
classes for storing
+          // into meta files.6
+          Option<HoodieBaseFile> dataFile = s.getBaseFile();
+          return new CompactionOperation(dataFile, partitionPath, logFiles,
+              writeConfig.getCompactionStrategy().captureMetrics(writeConfig, 
s));
+        }), partitionPaths.size()).stream()
+        
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
+
+    LOG.info("Total of " + operations.size() + " compaction operations are 
retrieved");
+    LOG.info("Total number of latest files slices " + totalFileSlices.value());
+    LOG.info("Total number of log files " + totalLogFiles.value());
+    LOG.info("Total number of file slices " + totalFileSlices.value());
+
+    if (operations.isEmpty()) {
+      LOG.warn("No operations are retrieved for " + metaClient.getBasePath());
+      return null;
+    }
+
+    // Filter the compactions with the passed in filter. This lets us choose 
most effective compactions only
+    HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, 
operations);
+    ValidationUtils.checkArgument(
+        compactionPlan.getOperations().stream().noneMatch(
+            op -> fgIdsInPendingCompactionAndClustering.contains(new 
HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
+        "Bad Compaction Plan. FileId MUST NOT have multiple pending 
compactions. "
+            + "Please fix your strategy implementation. 
FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+            + ", Selected workload :" + compactionPlan);
+    if (compactionPlan.getOperations().isEmpty()) {
+      LOG.warn("After filtering, Nothing to compact for " + 
metaClient.getBasePath());
+    }
+    return compactionPlan;
+  }
+
+  protected abstract HoodieCompactionPlan 
getCompactionPlan(HoodieTableMetaClient metaClient, 
List<HoodieCompactionOperation> operations);
+
+  protected abstract boolean filterLogCompactionOperations();
+
+  protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig 
writeConfig, List<String> partitionPaths) {
+    return partitionPaths;
+  }
+
+  protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime, Set<HoodieFileGroupId> pendingFileGroupIds) {
+    return fileSlice.getLogFiles().count() > 0 && 
!pendingFileGroupIds.contains(fileSlice.getFileGroupId());

Review Comment:
   preivosuly the filter condition was 
   ```
   !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())
   ```
   
   looks like we are adding logFileCount> 0 additionaly (for major compaction) 
in this patch? 
   can we revert that? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionStrategy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+public class CompactionExecutionStrategy<T extends HoodieRecordPayload, I, K, 
O> implements Serializable {
+
+  protected void transitionRequestedToInflight(HoodieTable table, String 
compactionInstantTime) {
+    HoodieActiveTimeline timeline = table.getActiveTimeline();
+    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+    // Mark instant as compaction inflight
+    timeline.transitionCompactionRequestedToInflight(instant);
+    table.getMetaClient().reloadActiveTimeline();
+  }
+
+  protected String instantTimeToUseForScanning(String compactionInstantTime, 
String maxInstantTime) {
+    return maxInstantTime;
+  }
+
+  protected boolean shouldPreserveCommitMetadata() {
+    return false;

Review Comment:
   infact we don't need this. preserveCommitMetadata is fetched from 
writeConfig from within HoodieMergeHandle. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -156,7 +159,14 @@ protected void processNextRecord(HoodieRecord<? extends 
HoodieRecordPayload> hoo
       // If combinedValue is oldValue, no need rePut oldRecord
       if (combinedValue != oldValue) {
         HoodieOperation operation = hoodieRecord.getOperation();
-        records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, 
hoodieRecord.getPartitionPath()), combinedValue, operation));
+        HoodieRecord latestHoodieRecord = new HoodieAvroRecord<>(new 
HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation);
+        // If preserveMetadata is true. Use the same location.
+        if (this.preserveCommitMetadata) {

Review Comment:
   there could be some gaps here. for eg, lets say this is set to true. only 
those records that got updated are getting new metadata. those that are not 
touched (L173) still holds old metadata. 
   
   But trying to think if we really need this. why can't we rewrite the 
filename alone and leave other fields untouched. for both major compaction and 
minor compaction.
   



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -234,6 +234,13 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "metadata table which are never added before. This config 
determines how to handle "
           + "such spurious deletes");
 
+  public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = 
ConfigProperty
+      .key(METADATA_PREFIX + ".log.record.reader.use.scanV2")
+      .defaultValue(false)
+      .sinceVersion("0.10.10")
+      .withDocumentation("There are cases when extra files are requested to be 
deleted from metadata table which was never added before. This config"

Review Comment:
   fix documentation



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -456,6 +491,23 @@ public List<WriteStatus> close() {
     }
   }
 
+  public void write(Map<String, HoodieRecord<? extends HoodieRecordPayload>> 
recordMap) {
+    Iterator<String> keyIterator = recordMap.keySet().stream().iterator();
+    try {
+      while (keyIterator.hasNext()) {
+        final String key = keyIterator.next();
+        HoodieRecord<T> record = (HoodieRecord<T>) recordMap.get(key);
+        init(record);
+        // For logCompaction operations all the records are read and written 
as a huge block.

Review Comment:
   I have left a comment elsewhere. I am bit confused here. lets discuss this 
f2f. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionStrategy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+public class CompactionExecutionStrategy<T extends HoodieRecordPayload, I, K, 
O> implements Serializable {
+
+  protected void transitionRequestedToInflight(HoodieTable table, String 
compactionInstantTime) {
+    HoodieActiveTimeline timeline = table.getActiveTimeline();
+    HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
+    // Mark instant as compaction inflight
+    timeline.transitionCompactionRequestedToInflight(instant);
+    table.getMetaClient().reloadActiveTimeline();
+  }
+
+  protected String instantTimeToUseForScanning(String compactionInstantTime, 
String maxInstantTime) {
+    return maxInstantTime;
+  }
+
+  protected boolean shouldPreserveCommitMetadata() {
+    return false;

Review Comment:
   ah, I see. this is used just for logCompaction and not for major compaction. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.plan.generators;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+public abstract class BaseHoodieCompactionPlanGenerator<T extends 
HoodieRecordPayload, I, K, O> implements Serializable {
+  private static final Logger LOG = 
LogManager.getLogger(BaseHoodieCompactionPlanGenerator.class);
+
+  protected final HoodieTable<T, I, K, O> hoodieTable;
+  protected final HoodieWriteConfig writeConfig;
+  protected final transient HoodieEngineContext engineContext;
+
+  public BaseHoodieCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+    this.hoodieTable = table;
+    this.writeConfig = writeConfig;
+    this.engineContext = engineContext;
+  }
+
+  public HoodieCompactionPlan generateCompactionPlan() throws IOException {
+    // Accumulator to keep track of total log files for a table
+    HoodieAccumulator totalLogFiles = engineContext.newAccumulator();
+    // Accumulator to keep track of total log file slices for a table
+    HoodieAccumulator totalFileSlices = engineContext.newAccumulator();
+
+    // TODO : check if maxMemory is not greater than JVM or executor memory
+    // TODO - rollback any compactions in flight
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, 
writeConfig.getMetadataConfig(), metaClient.getBasePath());
+
+    // filter the partition paths if needed to reduce list status
+    partitionPaths = filterPartitionPathsByStrategy(writeConfig, 
partitionPaths);
+
+    if (partitionPaths.isEmpty()) {
+      // In case no partitions could be picked, return no compaction plan
+      return null;
+    }
+    LOG.info("Looking for files to compact in " + partitionPaths + " 
partitions");
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for 
files to compact: " + writeConfig.getTableName());
+
+    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
this.hoodieTable.getSliceView();
+    Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+        .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+        .collect(Collectors.toSet());
+
+    // Exclude files in pending clustering from compaction.
+    
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+
+    if (filterLogCompactionOperations()) {
+      
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
+          .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+          .collect(Collectors.toList()));
+    }
+
+    String lastCompletedInstantTime = hoodieTable.getMetaClient()
+        
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+            HoodieTimeline.ROLLBACK_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
+        .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+    List<HoodieCompactionOperation> operations = 
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
+        .getLatestFileSlices(partitionPath)
+        .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, 
fgIdsInPendingCompactionAndClustering))
+        .map(s -> {
+          List<HoodieLogFile> logFiles =
+              
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
+          totalLogFiles.add(logFiles.size());
+          totalFileSlices.add(1L);
+          // Avro generated classes are not inheriting Serializable. Using 
CompactionOperation POJO
+          // for Map operations and collecting them finally in Avro generated 
classes for storing
+          // into meta files.6
+          Option<HoodieBaseFile> dataFile = s.getBaseFile();
+          return new CompactionOperation(dataFile, partitionPath, logFiles,
+              writeConfig.getCompactionStrategy().captureMetrics(writeConfig, 
s));
+        }), partitionPaths.size()).stream()
+        
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());

Review Comment:
   in master, we have this additional filter
   ```
           .filter(c -> !c.getDeltaFileNames().isEmpty()), 
partitionPaths.size()).stream()
   ```
   may I know where do we have this in this patch? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -362,6 +381,228 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
     }
   }
 
+  private void scanInternalV2(Option<KeySpec> keySpecOption, boolean 
skipProcessingBlocks) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
+    HoodieLogFormatReader logFormatReaderWrapper = null;
+    HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
+    HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
+    HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
+    try {
+
+      // Get the key field based on populate meta fields config
+      // and the table type
+      final String keyField = getKeyField();
+
+      boolean enableRecordLookups = !forceFullScan;
+      // Iterate over the paths
+      logFormatReaderWrapper = new HoodieLogFormatReader(fs,
+          logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
enableRecordLookups, keyField, internalSchema);
+
+      /**
+       * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
+       * First traversal to identify the rollback blocks and valid data and 
compacted blocks.
+       *
+       * Scanning blocks is easy to do in single writer mode, where the 
rollback block is right after the effected data blocks.
+       * With multiwriter mode the blocks can be out of sync. An example 
scenario.
+       * B1, B2, B3, B4, R1(B3), B5
+       * In this case, rollback block R1 is invalidating the B3 which is not 
the previous block.
+       * This becomes more complicated if we have compacted blocks, which are 
data blocks created using log compaction.
+       *
+       * To solve this, run a single traversal, collect all the valid blocks 
that are not corrupted
+       * along with the block instant times and rollback block's target 
instant times.
+       *
+       * As part of second traversal iterate block instant times in reverse 
order.
+       * While iterating in reverse order keep a track of final compacted 
instant times for each block.
+       * In doing so, when a data block is seen include the final compacted 
block if it is not already added.
+       *
+       * find the final compacted block which contains the merged contents.
+       * For example B1 and B2 are merged and created a compacted block called 
M1 and now M1, B3 and B4 are merged and
+       * created another compacted block called M2. So, now M2 is the final 
block which contains all the changes of B1,B2,B3,B4.
+       * So, blockTimeToCompactionBlockTimeMap will look like
+       * (B1 -> M2), (B2 -> M2), (B3 -> M2), (B4 -> M2), (M1 -> M2)
+       * This map is updated while iterating and is used to place the 
compacted blocks in the correct position.
+       * This way we can have multiple layers of merge blocks and still be 
able to find the correct positions of merged blocks.
+       */
+
+      // Collect targetRollbackInstants, using which we can determine which 
blocks are invalid.
+      Set<String> targetRollbackInstants = new HashSet<>();
+
+      // This holds block instant time to list of blocks. Note here the log 
blocks can be normal data blocks or compacted log blocks.
+      Map<String, List<HoodieLogBlock>> instantToBlocksMap = new HashMap<>();
+
+      // Order of Instants.
+      List<String> orderedInstantsList = new ArrayList<>();
+
+      Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
+
+      /*
+       * 1. First step to traverse in forward direction. While traversing the 
log blocks collect following,
+       *    a. instant times
+       *    b. instant to logblocks map.
+       *    c. targetRollbackInstants.
+       */
+      while (logFormatReaderWrapper.hasNext()) {
+        HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
+        LOG.info("Scanning log file " + logFile);
+        scannedLogFiles.add(logFile);
+        totalLogFiles.set(scannedLogFiles.size());
+        // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
+        totalLogBlocks.incrementAndGet();
+        // Ignore the corrupt blocks. No further handling is required for them.
+        if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) {
+          LOG.info("Found a corrupt block in " + logFile.getPath());
+          totalCorruptBlocks.incrementAndGet();
+          continue;
+        }
+        if 
(!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
+            HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
+          // hit a block with instant time greater than should be processed, 
stop processing further
+          break;
+        }
+        if (logBlock.getBlockType() != COMMAND_BLOCK) {
+          if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
+              || inflightInstantsTimeline.containsInstant(instantTime)) {
+            // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
+            continue;
+          }
+          if (instantRange.isPresent() && 
!instantRange.get().isInRange(instantTime)) {
+            // filter the log block by instant range
+            continue;
+          }
+        }
+
+        switch (logBlock.getBlockType()) {
+          case HFILE_DATA_BLOCK:
+          case AVRO_DATA_BLOCK:
+          case DELETE_BLOCK:
+            List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
+            if (logBlocksList.size() == 0) {
+              // Keep a track of instant Times in the order of arrival.
+              orderedInstantsList.add(instantTime);
+            }
+            logBlocksList.add(logBlock);
+            instantToBlocksMap.put(instantTime, logBlocksList);
+            break;
+          case COMMAND_BLOCK:
+            LOG.info("Reading a command block from file " + logFile.getPath());
+            // This is a command block - take appropriate action based on the 
command
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
+
+            // Rollback blocks contain information of instants that are 
failed, collect them in a set..
+            if (commandBlock.getType().equals(ROLLBACK_BLOCK)) {
+              totalRollbacks.incrementAndGet();
+              String targetInstantForCommandBlock =
+                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              targetRollbackInstants.add(targetInstantForCommandBlock);
+            } else {
+              throw new UnsupportedOperationException("Command type not yet 
supported.");
+            }
+            break;
+          default:
+            throw new UnsupportedOperationException("Block type not yet 
supported.");
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ordered instant times seen " + orderedInstantsList);
+      }
+
+      int numBlocksRolledBack = 0;
+
+      // All the block's instants time that are added to the queue are 
collected in this set.
+      Set<String> instantTimesIncluded = new HashSet<>();
+
+      // Key will have details related to instant time and value will be empty 
if that instant is not compacted.
+      // Ex: B1(i1), B2(i2), CB(i3,[i1,i2]) entries will be like i1 -> i3, i2 
-> i3.
+      Map<String, String> blockTimeToCompactionBlockTimeMap = new HashMap<>();
+
+      /*
+       * 2. Iterate the instants list in reverse order to get the latest 
instants first.
+       *    While iterating update the blockTimeToCompactionBlockTimesMap and 
include the compacted blocks in right position.
+       */
+      for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
+        String instantTime = orderedInstantsList.get(i);
+
+        // Exclude the blocks which are included in targetRollbackInstants set.
+        // Here, rollback can include instants affiliated to deltacommits or 
log compaction commits.
+        if (targetRollbackInstants.contains(instantTime)) {
+          numBlocksRolledBack += instantToBlocksMap.get(instantTime).size();
+          continue;
+        }
+        List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
+        if (instantsBlocks.size() == 0) {
+          throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
+        }
+        HoodieLogBlock firstBlock = instantsBlocks.get(0);
+
+        // For compacted blocks COMPACTED_BLOCK_TIMES entry is present under 
its headers.
+        if (firstBlock.getLogBlockHeader().containsKey(COMPACTED_BLOCK_TIMES)) 
{
+          // When compacted blocks are seen update the 
blockTimeToCompactionBlockTimeMap.
+          
Arrays.stream(firstBlock.getLogBlockHeader().get(COMPACTED_BLOCK_TIMES).split(","))
+              .forEach(originalInstant -> {
+                String finalInstant = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+                blockTimeToCompactionBlockTimeMap.put(originalInstant, 
finalInstant);
+              });
+        } else {
+          // When a data block is found check if it is already compacted.
+          String compactedFinalInstantTime = 
blockTimeToCompactionBlockTimeMap.get(instantTime);
+          if (compactedFinalInstantTime == null) {
+            // If it is not compacted then add the blocks related to the 
instant time at the end of the queue and continue.
+            instantToBlocksMap.get(instantTime).forEach(block -> 
currentInstantLogBlocks.addLast(block));

Review Comment:
   is this addressed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to