[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r798204981 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -65,11 +65,70 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, Stream fileSplits) { + public static InputSplit[] getRealtimeSplits(Configuration conf, List fileSplits) throws IOException { +if (fileSplits.isEmpty()) { + return new InputSplit[0]; +} + +FileSplit fileSplit = fileSplits.get(0); + +// Pre-process table-config to fetch virtual key info +Path partitionPath = fileSplit.getPath().getParent(); +HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath); + +Option hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient); + +// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase} +HoodieInstant latestCommitInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + +InputSplit[] finalSplits = fileSplits.stream() + .map(split -> { +// There are 4 types of splits could we have to handle here +//- {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file, +// but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit}) +//- {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file +// and does have log files appended +//- {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file +// and does have log files appended; +//- {@code FileSplit}: in case Hive passed down non-Hudi path +if (split instanceof RealtimeBootstrapBaseFileSplit) { + return split; +} else if (split instanceof BootstrapBaseFileSplit) { + BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split); + return createRealtimeBoostrapBaseFileSplit( + bootstrapBaseFileSplit, + metaClient.getBasePath(), + Collections.emptyList(), + latestCommitInstant.getTimestamp(), + false); +} else if (split instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split); Review comment: Yes, it's in sync. However, you brought up a very good point that the instant shouldn't actually be set here. This will be cleaned up in subsequent PRs where `HoodieRealtimeFileSplit` will be merged with `BaseWithLogFilesSplit` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r798204981 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -65,11 +65,70 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, Stream fileSplits) { + public static InputSplit[] getRealtimeSplits(Configuration conf, List fileSplits) throws IOException { +if (fileSplits.isEmpty()) { + return new InputSplit[0]; +} + +FileSplit fileSplit = fileSplits.get(0); + +// Pre-process table-config to fetch virtual key info +Path partitionPath = fileSplit.getPath().getParent(); +HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath); + +Option hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient); + +// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase} +HoodieInstant latestCommitInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + +InputSplit[] finalSplits = fileSplits.stream() + .map(split -> { +// There are 4 types of splits could we have to handle here +//- {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file, +// but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit}) +//- {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file +// and does have log files appended +//- {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file +// and does have log files appended; +//- {@code FileSplit}: in case Hive passed down non-Hudi path +if (split instanceof RealtimeBootstrapBaseFileSplit) { + return split; +} else if (split instanceof BootstrapBaseFileSplit) { + BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split); + return createRealtimeBoostrapBaseFileSplit( + bootstrapBaseFileSplit, + metaClient.getBasePath(), + Collections.emptyList(), + latestCommitInstant.getTimestamp(), + false); +} else if (split instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split); Review comment: It does -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r798092122 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java ## @@ -44,9 +44,7 @@ private Option hoodieVirtualKeyInfo = Option.empty(); - public HoodieRealtimeFileSplit() { -super(); Review comment: We don't need to remove it, but there's also no point in keeping it ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -144,28 +204,32 @@ return rtSplits.toArray(new InputSplit[0]); } + /** + * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)} + */ // get IncrementalRealtimeSplits - public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List fileSplits) throws IOException { + checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery), +"All splits have to belong to incremental query"); + List rtSplits = new ArrayList<>(); -List fileSplitList = fileSplits.collect(Collectors.toList()); -Set partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); +Set partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); // Pre process tableConfig from first partition to fetch virtual key info Option hoodieVirtualKeyInfo = Option.empty(); if (partitionSet.size() > 0) { hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); } Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; -fileSplitList.stream().forEach(s -> { +fileSplits.stream().forEach(s -> { // deal with incremental query. try { if (s instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; - if (bs.getBelongToIncrementalSplit()) { -rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); - } + BaseFileWithLogsSplit bs = unsafeCast(s); + rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); } else if (s instanceof RealtimeBootstrapBaseFileSplit) { - rtSplits.add(s); + RealtimeBootstrapBaseFileSplit bs = unsafeCast(s); Review comment: I see now. Makes sense ## File path: hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala ## @@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, refresh0() + /** + * Returns latest completed instant as seen by this instance of the file-index + */ + def latestCompletedInstant(): Option[HoodieInstant] = Review comment: It's def closer to former ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ## @@ -65,11 +65,71 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, Stream fileSplits) { + public static InputSplit[] getRealtimeSplits(Configuration conf, List fileSplits) throws IOException { +if (fileSplits.isEmpty()) { + return new InputSplit[0]; +} + +FileSplit fileSplit = fileSplits.get(0); + +// Pre-process table-config to fetch virtual key info +Path partitionPath = fileSplit.getPath().getParent(); +HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath); + +Option hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient); + +// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase} +HoodieInstant latestCommitInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + +InputSplit[] finalSplits = fileSplits.stream() + .map(split -> { +// There are 4 types of splits could we have to handle here +//- {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file, +// but does NOT have any log files appended (convert it
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r798022835 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -516,8 +556,6 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception { JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); client.commit(newCommitTime, writeStatusJavaRDD); -List statuses = writeStatusJavaRDD.collect(); -assertNoWriteErrors(statuses); Review comment: Yeah, this is a coarse hack though (just to keep the tests mostly intact). We should not actually dereference RDDs here since this couples tests not with API but with current implementation that might be subject to change in the future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791350112 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java ## @@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue())); - List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List inputPaths = roView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); Review comment: Yes, these are correct -- previously they were actually working correctly just b/c we did the double file-listing (w/in `getRealtimeSplits`). I have to pass _partition paths_, not base-file paths. ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -166,10 +169,12 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: These tests are actually written incorrectly -- they're dereferencing RDDs twice w/in `commit` and when the collect w/in the state itself. This leads to same base-files being double-written, which in turn fails assertion that i currently put in place to make sure that legacy flow and the new one yield identical results. ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); -List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); -List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, +List inputPaths = tableView.getLatestBaseFiles() +.map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) +.collect(Collectors.toList()); Review comment: Preserving existing behavior ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); -List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); -List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, +List inputPaths = tableView.getLatestBaseFiles() +.map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) +.collect(Collectors.toList()); Review comment: Preserving existing behavior (hence keeping lists instead of Sets) ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -268,11 +281,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro thirdClient.startCommitWithTime(newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); + statuses = writeStatusJavaRDD.collect(); -thirdClient.commit(newCommitTime, writeStatusJavaRDD); // Verify there are no errors assertNoWriteErrors(statuses); +thirdClient.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: Replied above ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -361,15 +386,19 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(newCommitT
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791356926 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java ## @@ -143,6 +121,124 @@ private static FileStatus getFileStatusUnchecked(Option baseFile return returns.toArray(new FileStatus[0]); } + private void validate(List targetFiles, List legacyFileStatuses) { Review comment: Moved methods aren't actually changing in here ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java ## @@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option baseFile return returns.toArray(new FileStatus[0]); } + private void validate(List targetFiles, List legacyFileStatuses) { +List diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); +checkState(diff.isEmpty(), "Should be empty"); + } + + @Nonnull + private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { +try { + return HoodieInputFormatUtils.getFileStatus(baseFile); +} catch (IOException ioe) { + throw new HoodieIOException("Failed to get file-status", ioe); +} + } + + /** + * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that + * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified + * as part of provided {@link JobConf} + */ + protected final FileStatus[] doListStatus(JobConf job) throws IOException { +return super.listStatus(job); + } + + /** + * Achieves listStatus functionality for an incrementally queried table. Instead of listing all + * partitions and then filtering based on the commits of interest, this logic first extracts the + * partitions touched by the desired commits and then lists only those partitions. + */ + protected List listStatusForIncrementalMode(JobConf job, + HoodieTableMetaClient tableMetaClient, + List inputPaths, + String incrementalTable) throws IOException { +String tableName = tableMetaClient.getTableConfig().getTableName(); +Job jobContext = Job.getInstance(job); +Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); +if (!timeline.isPresent()) { + return null; +} +Option> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get()); +if (!commitsToCheck.isPresent()) { + return null; +} +Option incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths); +// Mutate the JobConf to set the input paths to only partitions touched by incremental pull. +if (!incrementalInputPaths.isPresent()) { + return null; +} +setInputPaths(job, incrementalInputPaths.get()); +FileStatus[] fileStatuses = doListStatus(job); +return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); + } + + protected abstract boolean includeLogFilesForSnapshotView(); + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, + Stream logFiles, + Option latestCompletedInstantOpt, + HoodieTableMetaClient tableMetaClient) { +List sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); +FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); +try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBaseFilePath(baseFile.getPath()); + rtFileStatus.setBasePath(tableMetaClient.getBasePath()); + + if (latestCompletedInstantOpt.isPresent()) { +HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); +checkState(latestCompletedInstant.isCompleted()); + +rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { +rtFileStatus.setBootStrapFileStatus(baseFileStatus); + } + + return rtFileStatus; +} catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); +} + } + + @Nonnull +
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791355209 ## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java ## @@ -32,10 +32,6 @@ private FileSplit bootstrapFileSplit; - public BootstrapBaseFileSplit() { -super(); - } Review comment: Yeah, this isn't used outside of our codebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791354846 ## File path: hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala ## @@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, refresh0() + /** + * Returns latest completed instant as seen by this instance of the file-index + */ + def latestCompletedInstant(): Option[HoodieInstant] = Review comment: I think this is the approach we do take in Java, but in Scala this seems to be off (since you don't add braces at the end of the file-name so get-prefix feels off. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791354207 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -361,15 +386,19 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); -List dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); +List dataFiles = tableView.getLatestBaseFiles() +.map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) +.collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath()); assertEquals(200, recordsRead.size()); statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); -nClient.commit(newCommitTime, writeStatusJavaRDD); + +nClient.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: These are not cosmetic -- these are real test failures fixes (fixes base-files double-writing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791350725 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); -List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); -List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, +List inputPaths = tableView.getLatestBaseFiles() +.map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) +.collect(Collectors.toList()); Review comment: Preserving existing behavior (hence keeping lists instead of Sets) ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -268,11 +281,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro thirdClient.startCommitWithTime(newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); + statuses = writeStatusJavaRDD.collect(); -thirdClient.commit(newCommitTime, writeStatusJavaRDD); // Verify there are no errors assertNoWriteErrors(statuses); +thirdClient.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: Replied above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791350725 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); -List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); -List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, +List inputPaths = tableView.getLatestBaseFiles() +.map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) +.collect(Collectors.toList()); Review comment: Preserving existing behavior -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791350598 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ## @@ -166,10 +169,12 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro JavaRDD writeRecords = jsc().parallelize(records, 1); JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: These tests are actually written incorrectly -- they're dereferencing RDDs twice w/in `commit` and when the collect w/in the state itself. This leads to same base-files being double-written, which in turn fails assertion that i currently put in place to make sure that legacy flow and the new one yield identical results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s
alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791350112 ## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java ## @@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue())); - List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List inputPaths = roView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); Review comment: Yes, these are correct -- previously they were actually working correctly just b/c we did the double file-listing (w/in `getRealtimeSplits`). I have to pass _partition paths_, not base-file paths. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org