nsivabalan commented on code in PR #8490:
URL: https://github.com/apache/hudi/pull/8490#discussion_r1177166424


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, 
K, O> {
+
+  protected final Schema readerSchema;
+
+  public HoodieMergedReadHandle(HoodieWriteConfig config,
+                                Option<String> instantTime,
+                                HoodieTable<T, I, K, O> hoodieTable,
+                                Pair<String, String> partitionPathFileIDPair) {
+    super(config, instantTime, hoodieTable, partitionPathFileIDPair);
+    readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
+  }
+
+  public List<HoodieRecord<T>> getMergedRecords() {
+    Option<FileSlice> fileSliceOpt = getLatestFileSlice();
+    if (!fileSliceOpt.isPresent()) {
+      return Collections.emptyList();
+    }
+    checkState(nonEmpty(instantTime), String.format("Expected a valid instant 
time but got `%s`", instantTime));
+    final FileSlice fileSlice = fileSliceOpt.get();
+    final HoodieRecordLocation currentLocation = new 
HoodieRecordLocation(instantTime, fileSlice.getFileId());
+    Option<HoodieFileReader> baseFileReader = Option.empty();
+    HoodieMergedLogRecordScanner logRecordScanner = null;
+    try {
+      baseFileReader = getBaseFileReader(fileSlice);
+      logRecordScanner = getLogRecordScanner(fileSlice);
+      List<HoodieRecord<T>> mergedRecords = new ArrayList<>();
+      doMergedRead(baseFileReader, logRecordScanner).forEach(r -> {
+        r.unseal();
+        r.setCurrentLocation(currentLocation);
+        r.seal();
+        mergedRecords.add(r);
+      });
+      return mergedRecords;
+    } catch (IOException e) {
+      throw new HoodieIndexException("Error in reading " + fileSlice, e);
+    } finally {
+      if (baseFileReader.isPresent()) {
+        baseFileReader.get().close();
+      }
+      if (logRecordScanner != null) {
+        logRecordScanner.close();
+      }
+    }
+  }
+
+  private Option<FileSlice> getLatestFileSlice() {
+    if (nonEmpty(instantTime)
+        && 
hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant().isPresent())
 {
+      return Option.fromJavaOptional(hoodieTable
+          .getHoodieView()
+          
.getLatestMergedFileSlicesBeforeOrOn(partitionPathFileIDPair.getLeft(), 
instantTime)

Review Comment:
   In HoodieIndexUtils, we do call getLatestBaseFiles() to populate 
List<Pair<String, HoodieBaseFile>>. Can we fix that call to make 
getLatestMergedFileSlicesBeforeOrOn instead and then re-use the output here 
instead of making another call to 
HoodieView.getLatestMergedFileSlicesBeforeOrOn?  
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, 
K, O> {

Review Comment:
   may I know in what way this is different from HoodieFileSliceReader ? 
https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -175,4 +186,68 @@ public static boolean checkIfValidCommit(HoodieTimeline 
commitTimeline, String c
     // 2) is less than the first commit ts in the timeline
     return !commitTimeline.empty() && 
commitTimeline.containsOrBeforeTimelineStarts(commitTs);
   }
+
+  public static <R> HoodieData<HoodieRecord<R>> 
getTaggedRecordsFromPartitionLocations(

Review Comment:
   I feel we can do this in HoodieBloomIndexCheckFunction and 
HoodieKeyLookupHandle only and do it in one shot. as of cur patch, we fetch the 
locations based on just the base files and get the locations once and then call 
into getTaggedRecordsFromPartitionLocations which will again make 
getLatestFileSlice() and call snapshot read of base and log files to finalize 
record locations. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, 
K, O> {

Review Comment:
   generally this file feels complicated. already the LogRecordReader gives us 
the merged snapshot of all log files. So, all we need to do here is read 
records from base and merge w/ log records. Lets compare it w/ 
HoodieFileSliceReader and see if we can simplify things. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -175,4 +186,68 @@ public static boolean checkIfValidCommit(HoodieTimeline 
commitTimeline, String c
     // 2) is less than the first commit ts in the timeline
     return !commitTimeline.empty() && 
commitTimeline.containsOrBeforeTimelineStarts(commitTs);
   }
+
+  public static <R> HoodieData<HoodieRecord<R>> 
getTaggedRecordsFromPartitionLocations(
+      HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations, 
HoodieWriteConfig config, HoodieTable hoodieTable) {
+    final Option<String> instantTime = hoodieTable
+        .getMetaClient()
+        .getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::getTimestamp);
+    return partitionLocations.flatMap(p -> {
+      String partitionPath = p.getLeft();
+      String fileId = p.getRight().getFileId();
+      return new HoodieMergedReadHandle(config, instantTime, hoodieTable, 
Pair.of(partitionPath, fileId))
+          .getMergedRecords().iterator();
+    });
+  }
+
+  public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
+      HoodieData<Pair<HoodieRecord<R>, Option<Pair<String, 
HoodieRecordLocation>>>> taggedHoodieRecords, HoodieWriteConfig config, 
HoodieTable hoodieTable) {
+    // completely new records
+    HoodieData<HoodieRecord<R>> newRecords = taggedHoodieRecords.filter(p -> 
!p.getRight().isPresent()).map(Pair::getLeft);

Review Comment:
   we can void all these if our original index look up 
(findMatchingFilesForRecordKeys(), in other words if we fix the 
BloomIndexCheckFunction and keyLookupHandle) returns valid record to location 
mapping.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to