nsivabalan commented on code in PR #8490: URL: https://github.com/apache/hudi/pull/8490#discussion_r1177166424
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.util.StringUtils.nonEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K, O> { + + protected final Schema readerSchema; + + public HoodieMergedReadHandle(HoodieWriteConfig config, + Option<String> instantTime, + HoodieTable<T, I, K, O> hoodieTable, + Pair<String, String> partitionPathFileIDPair) { + super(config, instantTime, hoodieTable, partitionPathFileIDPair); + readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); + } + + public List<HoodieRecord<T>> getMergedRecords() { + Option<FileSlice> fileSliceOpt = getLatestFileSlice(); + if (!fileSliceOpt.isPresent()) { + return Collections.emptyList(); + } + checkState(nonEmpty(instantTime), String.format("Expected a valid instant time but got `%s`", instantTime)); + final FileSlice fileSlice = fileSliceOpt.get(); + final HoodieRecordLocation currentLocation = new HoodieRecordLocation(instantTime, fileSlice.getFileId()); + Option<HoodieFileReader> baseFileReader = Option.empty(); + HoodieMergedLogRecordScanner logRecordScanner = null; + try { + baseFileReader = getBaseFileReader(fileSlice); + logRecordScanner = getLogRecordScanner(fileSlice); + List<HoodieRecord<T>> mergedRecords = new ArrayList<>(); + doMergedRead(baseFileReader, logRecordScanner).forEach(r -> { + r.unseal(); + r.setCurrentLocation(currentLocation); + r.seal(); + mergedRecords.add(r); + }); + return mergedRecords; + } catch (IOException e) { + throw new HoodieIndexException("Error in reading " + fileSlice, e); + } finally { + if (baseFileReader.isPresent()) { + baseFileReader.get().close(); + } + if (logRecordScanner != null) { + logRecordScanner.close(); + } + } + } + + private Option<FileSlice> getLatestFileSlice() { + if (nonEmpty(instantTime) + && hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant().isPresent()) { + return Option.fromJavaOptional(hoodieTable + .getHoodieView() + .getLatestMergedFileSlicesBeforeOrOn(partitionPathFileIDPair.getLeft(), instantTime) Review Comment: In HoodieIndexUtils, we do call getLatestBaseFiles() to populate List<Pair<String, HoodieBaseFile>>. Can we fix that call to make getLatestMergedFileSlicesBeforeOrOn instead and then re-use the output here instead of making another call to HoodieView.getLatestMergedFileSlicesBeforeOrOn? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.util.StringUtils.nonEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K, O> { Review Comment: may I know in what way this is different from HoodieFileSliceReader ? https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java: ########## @@ -175,4 +186,68 @@ public static boolean checkIfValidCommit(HoodieTimeline commitTimeline, String c // 2) is less than the first commit ts in the timeline return !commitTimeline.empty() && commitTimeline.containsOrBeforeTimelineStarts(commitTs); } + + public static <R> HoodieData<HoodieRecord<R>> getTaggedRecordsFromPartitionLocations( Review Comment: I feel we can do this in HoodieBloomIndexCheckFunction and HoodieKeyLookupHandle only and do it in one shot. as of cur patch, we fetch the locations based on just the base files and get the locations once and then call into getTaggedRecordsFromPartitionLocations which will again make getLatestFileSlice() and call snapshot read of base and log files to finalize record locations. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.util.StringUtils.nonEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K, O> { Review Comment: generally this file feels complicated. already the LogRecordReader gives us the merged snapshot of all log files. So, all we need to do here is read records from base and merge w/ log records. Lets compare it w/ HoodieFileSliceReader and see if we can simplify things. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java: ########## @@ -175,4 +186,68 @@ public static boolean checkIfValidCommit(HoodieTimeline commitTimeline, String c // 2) is less than the first commit ts in the timeline return !commitTimeline.empty() && commitTimeline.containsOrBeforeTimelineStarts(commitTs); } + + public static <R> HoodieData<HoodieRecord<R>> getTaggedRecordsFromPartitionLocations( + HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) { + final Option<String> instantTime = hoodieTable + .getMetaClient() + .getCommitsTimeline() + .filterCompletedInstants() + .lastInstant() + .map(HoodieInstant::getTimestamp); + return partitionLocations.flatMap(p -> { + String partitionPath = p.getLeft(); + String fileId = p.getRight().getFileId(); + return new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(partitionPath, fileId)) + .getMergedRecords().iterator(); + }); + } + + public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates( + HoodieData<Pair<HoodieRecord<R>, Option<Pair<String, HoodieRecordLocation>>>> taggedHoodieRecords, HoodieWriteConfig config, HoodieTable hoodieTable) { + // completely new records + HoodieData<HoodieRecord<R>> newRecords = taggedHoodieRecords.filter(p -> !p.getRight().isPresent()).map(Pair::getLeft); Review Comment: we can void all these if our original index look up (findMatchingFilesForRecordKeys(), in other words if we fix the BloomIndexCheckFunction and keyLookupHandle) returns valid record to location mapping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org