This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2567ada6d5 Revert "[HUDI-5409] Avoid file index and use fs view cache 
in COW input format (#7493)" (#7526)
2567ada6d5 is described below

commit 2567ada6d5654bf8463cb55e25f5d662aa5a8475
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Sat Dec 24 09:06:49 2022 +0530

    Revert "[HUDI-5409] Avoid file index and use fs view cache in COW input 
format (#7493)" (#7526)
    
    This reverts commit cc1c1e7b33d9c95e5a2ba0e9a1db428d1e1b2a00.
---
 .../hudi/execution/TestDisruptorMessageQueue.java  |   4 +-
 .../hadoop/HoodieCopyOnWriteTableInputFormat.java  | 144 +++++++--------------
 .../HoodieMergeOnReadTableInputFormat.java         |  30 ++---
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |   2 +-
 4 files changed, 61 insertions(+), 119 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
index 7d324e5296..76c22f96e7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
@@ -39,7 +39,6 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.TaskContext$;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import scala.Tuple2;
@@ -86,11 +85,10 @@ public class TestDisruptorMessageQueue extends 
HoodieClientTestHarness {
 
   // Test to ensure that we are reading all records from queue iterator in the 
same order
   // without any exceptions.
-  @Disabled("Disabled for unblocking 0.12.2 release. Disruptor queue is not 
part of this minor release. Tracked in HUDI-5410")
   @SuppressWarnings("unchecked")
   @Test
   @Timeout(value = 60)
-  public void testRecordReading() {
+  public void testRecordReading() throws Exception {
 
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
     ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index ce441bf2e2..140e7ff5b6 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -18,9 +18,21 @@
 
 package org.apache.hudi.hadoop;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -30,8 +42,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.FileSystemViewManager;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
@@ -39,42 +50,21 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import javax.annotation.Nonnull;
-
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
-import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.buildMetadataConfig;
-import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getFileStatus;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * Base implementation of the Hive's {@link FileInputFormat} allowing for 
reading of Hudi's
@@ -200,7 +190,7 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
     return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
   }
 
-  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, 
Option<HoodieInstant> instantOpt, String basePath, Option<HoodieVirtualKeyInfo> 
virtualKeyInfoOpt) {
+  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, 
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> 
virtualKeyInfoOpt) {
     Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
 
     if (baseFileOpt.isPresent()) {
@@ -233,7 +223,6 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
 
     Map<HoodieTableMetaClient, List<Path>> groupedPaths =
         
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(),
 snapshotPaths);
-    Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new 
HashMap<>();
 
     for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : 
groupedPaths.entrySet()) {
       HoodieTableMetaClient tableMetaClient = entry.getKey();
@@ -247,83 +236,33 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
       boolean shouldIncludePendingCommits =
           HoodieHiveUtils.shouldIncludePendingCommits(job, 
tableMetaClient.getTableConfig().getTableName());
 
-      // NOTE: Fetching virtual key info is a costly operation as it needs to 
load the commit metadata.
-      //       This is only needed for MOR realtime splits. Hence, for COW 
tables, this can be avoided.
-      Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = 
tableMetaClient.getTableType().equals(COPY_ON_WRITE) ? Option.empty() : 
getHoodieVirtualKeyInfo(tableMetaClient);
-      String basePath = tableMetaClient.getBasePathV2().toString();
-
-      if (conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS) 
&& HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) {
-        HiveHoodieTableFileIndex fileIndex =
-            new HiveHoodieTableFileIndex(
-                engineContext,
-                tableMetaClient,
-                props,
-                HoodieTableQueryType.SNAPSHOT,
-                partitionPaths,
-                queryCommitInstant,
-                shouldIncludePendingCommits);
-
-        Map<String, List<FileSlice>> partitionedFileSlices = 
fileIndex.listFileSlices();
-
-        targetFiles.addAll(
-            partitionedFileSlices.values()
-                .stream()
-                .flatMap(Collection::stream)
-                .filter(fileSlice -> checkIfValidFileSlice(fileSlice))
-                .map(fileSlice -> createFileStatusUnchecked(fileSlice, 
fileIndex.getLatestCompletedInstant(), basePath, virtualKeyInfoOpt))
-                .collect(Collectors.toList())
-        );
-      } else {
-        HoodieTimeline timeline = getActiveTimeline(tableMetaClient, 
shouldIncludePendingCommits);
-        Option<String> queryInstant = queryCommitInstant.or(() -> 
timeline.lastInstant().map(HoodieInstant::getTimestamp));
-        validateInstant(timeline, queryInstant);
-
-        try {
-          HoodieTableFileSystemView fsView = 
fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient ->
-              
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, 
hoodieTableMetaClient, buildMetadataConfig(job), timeline));
-
-          List<FileSlice> filteredFileSlices = new ArrayList<>();
-
-          for (Path p : entry.getValue()) {
-            String relativePartitionPath = 
FSUtils.getRelativePartitionPath(new Path(basePath), p);
-
-            List<FileSlice> fileSlices = queryInstant.map(
-                instant -> 
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant))
-                .orElse(fsView.getLatestFileSlices(relativePartitionPath))
-                .collect(Collectors.toList());
-
-            filteredFileSlices.addAll(fileSlices);
-          }
-
-          targetFiles.addAll(
-              filteredFileSlices.stream()
-                  .filter(fileSlice -> checkIfValidFileSlice(fileSlice))
-                  .map(fileSlice -> createFileStatusUnchecked(fileSlice, 
timeline.filterCompletedInstants().lastInstant(), basePath, virtualKeyInfoOpt))
-                  .collect(Collectors.toList()));
-        } finally {
-          fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
-        }
-      }
+      HiveHoodieTableFileIndex fileIndex =
+          new HiveHoodieTableFileIndex(
+              engineContext,
+              tableMetaClient,
+              props,
+              HoodieTableQueryType.SNAPSHOT,
+              partitionPaths,
+              queryCommitInstant,
+              shouldIncludePendingCommits);
+
+      Map<String, List<FileSlice>> partitionedFileSlices = 
fileIndex.listFileSlices();
+
+      Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(tableMetaClient);
+
+      targetFiles.addAll(
+          partitionedFileSlices.values()
+              .stream()
+              .flatMap(Collection::stream)
+              .filter(fileSlice -> checkIfValidFileSlice(fileSlice))
+              .map(fileSlice -> createFileStatusUnchecked(fileSlice, 
fileIndex, virtualKeyInfoOpt))
+              .collect(Collectors.toList())
+      );
     }
 
     return targetFiles;
   }
 
-  private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient 
metaClient, boolean shouldIncludePendingCommits) {
-    HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
-    if (shouldIncludePendingCommits) {
-      return timeline;
-    } else {
-      return timeline.filterCompletedAndCompactionInstants();
-    }
-  }
-
-  private static void validateInstant(HoodieTimeline activeTimeline, 
Option<String> queryInstant) {
-    if (queryInstant.isPresent() && 
!activeTimeline.containsInstant(queryInstant.get())) {
-      throw new HoodieIOException(String.format("Query instant (%s) not found 
in the timeline", queryInstant.get()));
-    }
-  }
-
   protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
     Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
     Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
@@ -338,10 +277,15 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
     }
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
   @Nonnull
   protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
     try {
-      return getFileStatus(baseFile);
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
     } catch (IOException ioe) {
       throw new HoodieIOException("Failed to get file-status", ioe);
     }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 6a198f9ad3..95a1a74b65 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -18,6 +18,16 @@
 
 package org.apache.hudi.hadoop.realtime;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -33,23 +43,13 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
 import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
+import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
 import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
 import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
 import org.apache.hudi.hadoop.RealtimeFileStatus;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapreduce.Job;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -86,14 +86,14 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
   }
 
   @Override
-  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
-                                                 Option<HoodieInstant> 
latestCompletedInstantOpt,
-                                                 String tableBasePath,
-                                                 Option<HoodieVirtualKeyInfo> 
virtualKeyInfoOpt) {
+  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, 
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> 
virtualKeyInfoOpt) {
     Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
     Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
     Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
 
+    Option<HoodieInstant> latestCompletedInstantOpt = 
fileIndex.getLatestCompletedInstant();
+    String tableBasePath = fileIndex.getBasePath().toString();
+
     // Check if we're reading a MOR table
     if (baseFileOpt.isPresent()) {
       return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, 
tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index eeeedc061e..f9c2c9ca29 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -449,7 +449,7 @@ public class HoodieInputFormatUtils {
    * @param dataFile
    * @return
    */
-  public static HoodieBaseFile refreshFileStatus(Configuration conf, 
HoodieBaseFile dataFile) {
+  private static HoodieBaseFile refreshFileStatus(Configuration conf, 
HoodieBaseFile dataFile) {
     Path dataPath = dataFile.getFileStatus().getPath();
     try {
       if (dataFile.getFileSize() == 0) {

Reply via email to