[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-02-02 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798204981



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -65,11 +65,70 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List fileSplits) throws IOException {
+if (fileSplits.isEmpty()) {
+  return new InputSplit[0];
+}
+
+FileSplit fileSplit = fileSplits.get(0);
+
+// Pre-process table-config to fetch virtual key info
+Path partitionPath = fileSplit.getPath().getParent();
+HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+Option hoodieVirtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(metaClient);
+
+// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+HoodieInstant latestCommitInstant =
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+InputSplit[] finalSplits = fileSplits.stream()
+  .map(split -> {
+// There are 4 types of splits could we have to handle here
+//- {@code BootstrapBaseFileSplit}: in case base file does have 
associated bootstrap file,
+//  but does NOT have any log files appended (convert it to {@code 
RealtimeBootstrapBaseFileSplit})
+//- {@code RealtimeBootstrapBaseFileSplit}: in case base file does 
have associated bootstrap file
+//  and does have log files appended
+//- {@code BaseFileWithLogsSplit}: in case base file does NOT have 
associated bootstrap file
+//   and does have log files appended;
+//- {@code FileSplit}: in case Hive passed down non-Hudi path
+if (split instanceof RealtimeBootstrapBaseFileSplit) {
+  return split;
+} else if (split instanceof BootstrapBaseFileSplit) {
+  BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
+  return createRealtimeBoostrapBaseFileSplit(
+  bootstrapBaseFileSplit,
+  metaClient.getBasePath(),
+  Collections.emptyList(),
+  latestCommitInstant.getTimestamp(),
+  false);
+} else if (split instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);

Review comment:
   Yes, it's in sync. However, you brought up a very good point that the 
instant shouldn't actually be set here. This will be cleaned up in subsequent 
PRs where `HoodieRealtimeFileSplit` will be merged with `BaseWithLogFilesSplit`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-02-02 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798204981



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -65,11 +65,70 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List fileSplits) throws IOException {
+if (fileSplits.isEmpty()) {
+  return new InputSplit[0];
+}
+
+FileSplit fileSplit = fileSplits.get(0);
+
+// Pre-process table-config to fetch virtual key info
+Path partitionPath = fileSplit.getPath().getParent();
+HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+Option hoodieVirtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(metaClient);
+
+// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+HoodieInstant latestCommitInstant =
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+InputSplit[] finalSplits = fileSplits.stream()
+  .map(split -> {
+// There are 4 types of splits could we have to handle here
+//- {@code BootstrapBaseFileSplit}: in case base file does have 
associated bootstrap file,
+//  but does NOT have any log files appended (convert it to {@code 
RealtimeBootstrapBaseFileSplit})
+//- {@code RealtimeBootstrapBaseFileSplit}: in case base file does 
have associated bootstrap file
+//  and does have log files appended
+//- {@code BaseFileWithLogsSplit}: in case base file does NOT have 
associated bootstrap file
+//   and does have log files appended;
+//- {@code FileSplit}: in case Hive passed down non-Hudi path
+if (split instanceof RealtimeBootstrapBaseFileSplit) {
+  return split;
+} else if (split instanceof BootstrapBaseFileSplit) {
+  BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
+  return createRealtimeBoostrapBaseFileSplit(
+  bootstrapBaseFileSplit,
+  metaClient.getBasePath(),
+  Collections.emptyList(),
+  latestCommitInstant.getTimestamp(),
+  false);
+} else if (split instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);

Review comment:
   It does




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-02-02 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798092122



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
##
@@ -44,9 +44,7 @@
 
   private Option hoodieVirtualKeyInfo = Option.empty();
 
-  public HoodieRealtimeFileSplit() {
-super();

Review comment:
   We don't need to remove it, but there's also no point in keeping it

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -144,28 +204,32 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  /**
+   * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, 
List)}
+   */
   // get IncrementalRealtimeSplits
-  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
List fileSplits) throws IOException {
+
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
+"All splits have to belong to incremental query");
+
 List rtSplits = new ArrayList<>();
-List fileSplitList = fileSplits.collect(Collectors.toList());
-Set partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+Set partitionSet = fileSplits.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
 Map partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
 // Pre process tableConfig from first partition to fetch virtual key info
 Option hoodieVirtualKeyInfo = Option.empty();
 if (partitionSet.size() > 0) {
   hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
 }
 Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
-fileSplitList.stream().forEach(s -> {
+fileSplits.stream().forEach(s -> {
   // deal with incremental query.
   try {
 if (s instanceof BaseFileWithLogsSplit) {
-  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
-  if (bs.getBelongToIncrementalSplit()) {
-rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
-  }
+  BaseFileWithLogsSplit bs = unsafeCast(s);
+  rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
 } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
-  rtSplits.add(s);
+  RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);

Review comment:
   I see now. Makes sense

##
File path: 
hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
##
@@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: 
HoodieEngineContext,
 
   refresh0()
 
+  /**
+   * Returns latest completed instant as seen by this instance of the 
file-index
+   */
+  def latestCompletedInstant(): Option[HoodieInstant] =

Review comment:
   It's def closer to former

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List fileSplits) throws IOException {
+if (fileSplits.isEmpty()) {
+  return new InputSplit[0];
+}
+
+FileSplit fileSplit = fileSplits.get(0);
+
+// Pre-process table-config to fetch virtual key info
+Path partitionPath = fileSplit.getPath().getParent();
+HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+Option hoodieVirtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(metaClient);
+
+// NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+HoodieInstant latestCommitInstant =
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+InputSplit[] finalSplits = fileSplits.stream()
+  .map(split -> {
+// There are 4 types of splits could we have to handle here
+//- {@code BootstrapBaseFileSplit}: in case base file does have 
associated bootstrap file,
+//  but does NOT have any log files appended (convert it 

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-02-02 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798022835



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -516,8 +556,6 @@ void testMORTableRestore(boolean restoreAfterCompaction) 
throws Exception {
 JavaRDD writeRecords = jsc().parallelize(records, 1);
 JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, 
newCommitTime);
 client.commit(newCommitTime, writeStatusJavaRDD);
-List statuses = writeStatusJavaRDD.collect();
-assertNoWriteErrors(statuses);

Review comment:
   Yeah, this is a coarse hack though (just to keep the tests mostly 
intact). We should not actually dereference RDDs here since this couples tests 
not with API but with current implementation that might be subject to change in 
the future




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-25 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791350112



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean 
populateMetaFields) throws Exception {
 
   assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> 
fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-  List dataFiles = 
roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-  List recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+  List inputPaths = roView.getLatestBaseFiles()
+  .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+  .collect(Collectors.toList());

Review comment:
   Yes, these are correct -- previously they were actually working 
correctly just b/c we did the double file-listing (w/in `getRealtimeSplits`). 
   
   I have to pass _partition paths_, not base-file paths. 

##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -166,10 +169,12 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
   JavaRDD writeRecords = jsc().parallelize(records, 1);
 
   JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, 
newCommitTime);
-  client.commit(newCommitTime, writeStatusJavaRDD);
+
   List statuses = writeStatusJavaRDD.collect();
   assertNoWriteErrors(statuses);
 
+  client.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
   These tests are actually written incorrectly -- they're dereferencing 
RDDs twice w/in `commit` and when the collect w/in the state itself. This leads 
to same base-files being double-written, which in turn fails assertion that i 
currently put in place to make sure that legacy flow and the new one yield 
identical results.

##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
 copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
 copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-List dataFiles = 
tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-List recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+List inputPaths = tableView.getLatestBaseFiles()
+.map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+.collect(Collectors.toList());

Review comment:
   Preserving existing behavior

##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
 copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
 copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-List dataFiles = 
tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-List recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+List inputPaths = tableView.getLatestBaseFiles()
+.map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+.collect(Collectors.toList());

Review comment:
   Preserving existing behavior (hence keeping lists instead of Sets)

##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -268,11 +281,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
 thirdClient.startCommitWithTime(newCommitTime);
 
 writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
+
 statuses = writeStatusJavaRDD.collect();
-thirdClient.commit(newCommitTime, writeStatusJavaRDD);
 // Verify there are no errors
 assertNoWriteErrors(statuses);
 
+thirdClient.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
   Replied above

##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -361,15 +386,19 @@ void testMultiRollbackWithDeltaAndCompactionCommit() 
throws Exception {
 copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
 copyOfRecords.addAll(dataGen.generateInserts(newCommitT

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791356926



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##
@@ -143,6 +121,124 @@ private static FileStatus 
getFileStatusUnchecked(Option baseFile
 return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List targetFiles, List 
legacyFileStatuses) {

Review comment:
   Moved methods aren't actually changing in here

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##
@@ -143,6 +121,127 @@ private static FileStatus 
getFileStatusUnchecked(Option baseFile
 return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List targetFiles, List 
legacyFileStatuses) {
+List diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+try {
+  return HoodieInputFormatUtils.getFileStatus(baseFile);
+} catch (IOException ioe) {
+  throw new HoodieIOException("Failed to get file-status", ioe);
+}
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} 
operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to 
the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  protected List listStatusForIncrementalMode(JobConf job,
+  
HoodieTableMetaClient tableMetaClient,
+  List 
inputPaths,
+  String 
incrementalTable) throws IOException {
+String tableName = tableMetaClient.getTableConfig().getTableName();
+Job jobContext = Job.getInstance(job);
+Option timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+if (!timeline.isPresent()) {
+  return null;
+}
+Option> commitsToCheck = 
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, 
timeline.get());
+if (!commitsToCheck.isPresent()) {
+  return null;
+}
+Option incrementalInputPaths = 
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), 
tableMetaClient, timeline.get(), inputPaths);
+// Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+if (!incrementalInputPaths.isPresent()) {
+  return null;
+}
+setInputPaths(job, incrementalInputPaths.get());
+FileStatus[] fileStatuses = doListStatus(job);
+return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+  
Stream logFiles,
+  
Option latestCompletedInstantOpt,
+  
HoodieTableMetaClient tableMetaClient) {
+List sortedLogFiles = 
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+try {
+  RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+  rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+  rtFileStatus.setBaseFilePath(baseFile.getPath());
+  rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+  if (latestCompletedInstantOpt.isPresent()) {
+HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+checkState(latestCompletedInstant.isCompleted());
+
+rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+  }
+
+  if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || 
baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+  }
+
+  return rtFileStatus;
+} catch (IOException e) {
+  throw new HoodieIOException(String.format("Failed to init %s", 
RealtimeFileStatus.class.getSimpleName()), e);
+}
+  }
+
+  @Nonnull
+ 

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791355209



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java
##
@@ -32,10 +32,6 @@
 
   private FileSplit bootstrapFileSplit;
 
-  public BootstrapBaseFileSplit() {
-super();
-  }

Review comment:
   Yeah, this isn't used outside of our codebase




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791354846



##
File path: 
hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
##
@@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: 
HoodieEngineContext,
 
   refresh0()
 
+  /**
+   * Returns latest completed instant as seen by this instance of the 
file-index
+   */
+  def latestCompletedInstant(): Option[HoodieInstant] =

Review comment:
   I think this is the approach we do take in Java, but in Scala this seems 
to be off (since you don't add braces at the end of the file-name so get-prefix 
feels off.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791354207



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -361,15 +386,19 @@ void testMultiRollbackWithDeltaAndCompactionCommit() 
throws Exception {
 copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
 copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
 
-List dataFiles = tableView.getLatestBaseFiles().map(hf -> 
hf.getPath()).collect(Collectors.toList());
+List dataFiles = tableView.getLatestBaseFiles()
+.map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+.collect(Collectors.toList());
 List recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
 basePath());
 assertEquals(200, recordsRead.size());
 
 statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), 
newCommitTime).collect();
 // Verify there are no errors
 assertNoWriteErrors(statuses);
-nClient.commit(newCommitTime, writeStatusJavaRDD);
+
+nClient.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
   These are not cosmetic -- these are real test failures fixes (fixes 
base-files double-writing)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791350725



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
 copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
 copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-List dataFiles = 
tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-List recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+List inputPaths = tableView.getLatestBaseFiles()
+.map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+.collect(Collectors.toList());

Review comment:
   Preserving existing behavior (hence keeping lists instead of Sets)

##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -268,11 +281,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
 thirdClient.startCommitWithTime(newCommitTime);
 
 writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
+
 statuses = writeStatusJavaRDD.collect();
-thirdClient.commit(newCommitTime, writeStatusJavaRDD);
 // Verify there are no errors
 assertNoWriteErrors(statuses);
 
+thirdClient.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
   Replied above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791350725



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
 copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
 copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-List dataFiles = 
tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-List recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+List inputPaths = tableView.getLatestBaseFiles()
+.map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+.collect(Collectors.toList());

Review comment:
   Preserving existing behavior




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791350598



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##
@@ -166,10 +169,12 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
   JavaRDD writeRecords = jsc().parallelize(records, 1);
 
   JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, 
newCommitTime);
-  client.commit(newCommitTime, writeStatusJavaRDD);
+
   List statuses = writeStatusJavaRDD.collect();
   assertNoWriteErrors(statuses);
 
+  client.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
   These tests are actually written incorrectly -- they're dereferencing 
RDDs twice w/in `commit` and when the collect w/in the state itself. This leads 
to same base-files being double-written, which in turn fails assertion that i 
currently put in place to make sure that legacy flow and the new one yield 
identical results.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

2022-01-24 Thread GitBox


alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791350112



##
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean 
populateMetaFields) throws Exception {
 
   assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> 
fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-  List dataFiles = 
roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-  List recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+  List inputPaths = roView.getLatestBaseFiles()
+  .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+  .collect(Collectors.toList());

Review comment:
   Yes, these are correct -- previously they were actually working 
correctly just b/c we did the double file-listing (w/in `getRealtimeSplits`). 
   
   I have to pass _partition paths_, not base-file paths. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org