This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-0.12.2-shadow by this push: new d3d427e179b reverting complicated patch 7340 and fixed some build failures d3d427e179b is described below commit d3d427e179b9ffb3dcf2cf764d401582f73352a8 Author: sivabalan <n.siv...@gmail.com> AuthorDate: Tue Dec 13 16:52:03 2022 -0800 reverting complicated patch 7340 and fixed some build failures --- .../apache/hudi/table/action/commit/FlinkMergeHelper.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index 01466484d63..733ad76b975 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -18,7 +18,9 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -39,6 +41,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.Iterator; @@ -83,10 +86,15 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHe BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null; Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + HoodieFileReader<GenericRecord> bootstrapFileReader = null; try { final Iterator<GenericRecord> readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { - readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); + Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); + bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath); + readerIterator = new MergingIterator<>(reader.getRecordIterator(), bootstrapFileReader.getRecordIterator(), + (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); } else { readerIterator = reader.getRecordIterator(readSchema); } @@ -109,6 +117,9 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHe if (reader != null) { reader.close(); } + if (bootstrapFileReader != null) { + bootstrapFileReader.close(); + } if (null != wrapper) { wrapper.shutdownNow(); wrapper.awaitTermination();