[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-11-02 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740695703



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +148,82 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // get IncrementalRealtimeSplits
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
+List rtSplits = new ArrayList<>();
+List fileSplitList = fileSplits.collect(Collectors.toList());
+Set partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+Map partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
+// Pre process tableConfig from first partition to fetch virtual key info
+Option hoodieVirtualKeyInfo = Option.empty();
+if (partitionSet.size() > 0) {
+  hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
+}
+Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
+fileSplitList.stream().forEach(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+});
+LOG.info("Returning a total splits of " + rtSplits.size());
+return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  public static Option 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+HoodieTableConfig tableConfig = metaClient.getTableConfig();
+if (!tableConfig.populateMetaFields()) {
+  TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+  try {
+MessageType parquetSchema = 
tableSchemaResolver.getTableParquetSchema();
+return Option.of(new 
HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
+tableConfig.getPartitionFieldProp(), 
parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
+parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(;
+  } catch (Exception exception) {
+throw new HoodieException("Fetching table schema failed with exception 
", exception);
+  }
+}
+return Option.empty();
+  }
+
+  public static boolean isIncrementalQuerySplits(List fileSplits) {
+if (fileSplits == null || fileSplits.size() == 0) {
+  return false;
+}
+return fileSplits.stream().anyMatch(s -> {
+  if (s instanceof BaseFileWithLogsSplit) {
+BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+return bs.getBelongToIncrementalSplit();
+  } else {
+return s instanceof RealtimeBootstrapBaseFileSplit;
+  }
+});
+  }
+
+  public static RealtimeBootstrapBaseFileSplit 
createRealimeBootstrapBaseFileSplit(

Review comment:
   ok

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-Stream fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+List fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *a. Filter affected partitions based on inputPaths.
+   *b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-11-02 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740695703



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +148,82 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // get IncrementalRealtimeSplits
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
+List rtSplits = new ArrayList<>();
+List fileSplitList = fileSplits.collect(Collectors.toList());
+Set partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+Map partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
+// Pre process tableConfig from first partition to fetch virtual key info
+Option hoodieVirtualKeyInfo = Option.empty();
+if (partitionSet.size() > 0) {
+  hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
+}
+Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
+fileSplitList.stream().forEach(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+});
+LOG.info("Returning a total splits of " + rtSplits.size());
+return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  public static Option 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+HoodieTableConfig tableConfig = metaClient.getTableConfig();
+if (!tableConfig.populateMetaFields()) {
+  TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+  try {
+MessageType parquetSchema = 
tableSchemaResolver.getTableParquetSchema();
+return Option.of(new 
HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
+tableConfig.getPartitionFieldProp(), 
parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
+parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(;
+  } catch (Exception exception) {
+throw new HoodieException("Fetching table schema failed with exception 
", exception);
+  }
+}
+return Option.empty();
+  }
+
+  public static boolean isIncrementalQuerySplits(List fileSplits) {
+if (fileSplits == null || fileSplits.size() == 0) {
+  return false;
+}
+return fileSplits.stream().anyMatch(s -> {
+  if (s instanceof BaseFileWithLogsSplit) {
+BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+return bs.getBelongToIncrementalSplit();
+  } else {
+return s instanceof RealtimeBootstrapBaseFileSplit;
+  }
+});
+  }
+
+  public static RealtimeBootstrapBaseFileSplit 
createRealimeBootstrapBaseFileSplit(

Review comment:
   ok

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-Stream fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+List fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *a. Filter affected partitions based on inputPaths.
+   *b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-11-01 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740697734



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-Stream fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+List fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *a. Filter affected partitions based on inputPaths.
+   *b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when we get baseFileStatus from 
HoodieTableFileSystemView,
+   *the BaseFileStatus will missing file size information.
+   *We should use candidate fileStatus to update the size information 
for BaseFileStatus.
+   * Step6: For every file group from step3(b)
+   *Get 1st available base file from all file slices. then we use 
candidate file status to update the baseFileStatus,
+   *and construct RealTimeFileStatus and add it to result along with 
log files.
+   *If file group just has log files, construct RealTimeFileStatus and 
add it to result.
+   * TODO: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List listStatusForIncrementalMode(
+  JobConf job, HoodieTableMetaClient tableMetaClient, List 
inputPaths) throws IOException {
+List result = new ArrayList<>();
+String tableName = tableMetaClient.getTableConfig().getTableName();
+Job jobContext = Job.getInstance(job);
+
+// step1
+Option timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+if (!timeline.isPresent()) {
+  return result;
+}
+String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+// Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+Option> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+if (!commitsToCheck.isPresent()) {
+  return result;
+}
+// step2
+commitsToCheck.get().sort(HoodieInstant::compareTo);
+List metadataList = commitsToCheck
+.get().stream().map(instant -> {
+  try {
+return HoodieInputFormatUtils.getCommitMetadata(instant, 
commitsTimelineToReturn);
+  } catch (IOException e) {
+throw new HoodieException(String.format("cannot get metadata for 
instant: %s", instant));
+  }
+}).collect(Collectors.toList());
+
+// build fileGroup from fsView
+List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
+.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
metadataList));
+// step3
+HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+// build fileGroup from fsView
+Path basePath = new Path(tableMetaClient.getBasePath());
+// filter affectedPartition by inputPaths
+List affectedPartition = 
HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream()
+.filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+if (affectedPartition.isEmpty()) {
+  return result;
+}
+List fileGroups = affectedPartition.stream()

Review comment:
   L148 has already added all files in HoodieTableFileSystemView, and 
partitionToFileGroupsMap will be filled and cached。
   L1

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-11-01 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740696192



##
File path: 
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
##
@@ -535,4 +548,211 @@ public void 
testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa
 arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
 );
   }
+
+  @Test
+  public void testIncremetalWithOnlylog() throws Exception {
+// initial commit
+Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
+HoodieTestUtils.init(hadoopConf, basePath.toString(), 
HoodieTableType.MERGE_ON_READ);
+String instantTime = "100";
+final int numRecords = 1000;
+File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, 
schema, 1, numRecords, instantTime,
+HoodieTableType.MERGE_ON_READ);
+//FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime);
+createDeltaCommitFile(basePath, instantTime,"2016/05/01", 
"2016/05/01/fileid0_1-0-1_100.parquet", "fileid0");
+// Add the paths
+FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
+
+// insert new records to log file
+try {
+  String newCommitTime = "102";
+  HoodieLogFormat.Writer writer =
+  InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid0", instantTime, newCommitTime,
+  numRecords, numRecords, 0);
+  writer.close();
+  createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", 
"2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0");
+
+  InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1);
+
+  HoodieParquetRealtimeInputFormat inputFormat =  new 
HoodieParquetRealtimeInputFormat();
+  inputFormat.setConf(baseJobConf);
+  InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1);
+  assertTrue(splits.length == 1);
+  JobConf newJobConf = new JobConf(baseJobConf);
+  List fields = schema.getFields();
+  setHiveColumnNameProps(fields, newJobConf, false);
+  RecordReader reader  = 
inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL);
+  // use reader to read log file.
+  NullWritable key = reader.createKey();
+  ArrayWritable value = reader.createValue();
+  while (reader.next(key, value)) {
+Writable[] values = value.get();
+// check if the record written is with latest commit, here "101"
+assertEquals(newCommitTime, values[0].toString());
+key = reader.createKey();
+value = reader.createValue();
+  }
+  reader.close();
+} catch (IOException e) {
+  throw new HoodieException(e.getMessage(), e);
+}
+  }
+
+  @Test
+  public void testIncremetalWithReplace() throws Exception {

Review comment:
   yes will add test case for compaction, 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-11-01 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740695822



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -61,9 +85,183 @@
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
 
-Stream fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+List fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList());
+
+boolean isIncrementalSplits = 
HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits);
+
+return isIncrementalSplits ? 
HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, 
fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, 
fileSplits.stream());
+  }
+
+  /**
+   * Keep the logic of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).
+   * Step2: Get list of affected files status for these affected file status.
+   * Step3: Construct HoodieTableFileSystemView based on those affected file 
status.
+   *a. Filter affected partitions based on inputPaths.
+   *b. Get list of fileGroups based on affected partitions by 
fsView.getAllFileGroups.
+   * Step4: Set input paths based on filtered affected partition paths. 
changes that amony original input paths passed to
+   *this method. some partitions did not have commits as part of the 
trimmed down list of commits and hence we need this step.
+   * Step5: Find candidate fileStatus, since when we get baseFileStatus from 
HoodieTableFileSystemView,
+   *the BaseFileStatus will missing file size information.
+   *We should use candidate fileStatus to update the size information 
for BaseFileStatus.
+   * Step6: For every file group from step3(b)
+   *Get 1st available base file from all file slices. then we use 
candidate file status to update the baseFileStatus,
+   *and construct RealTimeFileStatus and add it to result along with 
log files.
+   *If file group just has log files, construct RealTimeFileStatus and 
add it to result.
+   * TODO: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List listStatusForIncrementalMode(
+  JobConf job, HoodieTableMetaClient tableMetaClient, List 
inputPaths) throws IOException {
+List result = new ArrayList<>();
+String tableName = tableMetaClient.getTableConfig().getTableName();
+Job jobContext = Job.getInstance(job);
+
+// step1
+Option timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+if (!timeline.isPresent()) {
+  return result;
+}
+String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+// Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);

Review comment:
   good suggest, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-11-01 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r740695703



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +148,82 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // get IncrementalRealtimeSplits
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
+List rtSplits = new ArrayList<>();
+List fileSplitList = fileSplits.collect(Collectors.toList());
+Set partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+Map partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
+// Pre process tableConfig from first partition to fetch virtual key info
+Option hoodieVirtualKeyInfo = Option.empty();
+if (partitionSet.size() > 0) {
+  hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
+}
+Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
+fileSplitList.stream().forEach(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+});
+LOG.info("Returning a total splits of " + rtSplits.size());
+return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  public static Option 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+HoodieTableConfig tableConfig = metaClient.getTableConfig();
+if (!tableConfig.populateMetaFields()) {
+  TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+  try {
+MessageType parquetSchema = 
tableSchemaResolver.getTableParquetSchema();
+return Option.of(new 
HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
+tableConfig.getPartitionFieldProp(), 
parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
+parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(;
+  } catch (Exception exception) {
+throw new HoodieException("Fetching table schema failed with exception 
", exception);
+  }
+}
+return Option.empty();
+  }
+
+  public static boolean isIncrementalQuerySplits(List fileSplits) {
+if (fileSplits == null || fileSplits.size() == 0) {
+  return false;
+}
+return fileSplits.stream().anyMatch(s -> {
+  if (s instanceof BaseFileWithLogsSplit) {
+BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+return bs.getBelongToIncrementalSplit();
+  } else {
+return s instanceof RealtimeBootstrapBaseFileSplit;
+  }
+});
+  }
+
+  public static RealtimeBootstrapBaseFileSplit 
createRealimeBootstrapBaseFileSplit(

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-27 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r738013103



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +148,110 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // get IncrementalRealtimeSplits
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
+List rtSplits = new ArrayList<>();
+List fileSplitList = fileSplits.collect(Collectors.toList());
+Set partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+Map partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
+// Pre process tableConfig from first partition to fetch virtual key info
+Option hoodieVirtualKeyInfo = Option.empty();
+if (partitionSet.size() > 0) {
+  hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
+}
+Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
+fileSplitList.stream().forEach(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+});
+LOG.info("Returning a total splits of " + rtSplits.size());
+return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  public static Option 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+HoodieTableConfig tableConfig = metaClient.getTableConfig();
+if (!tableConfig.populateMetaFields()) {
+  TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+  try {
+MessageType parquetSchema = 
tableSchemaResolver.getTableParquetSchema();
+return Option.of(new 
HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
+tableConfig.getPartitionFieldProp(), 
parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
+parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(;
+  } catch (Exception exception) {
+throw new HoodieException("Fetching table schema failed with exception 
", exception);
+  }
+}
+return Option.empty();
+  }
+
+  public static boolean isIncrementalQuerySplits(List fileSplits) {
+if (fileSplits == null || fileSplits.size() == 0) {
+  return false;
+}
+return fileSplits.stream().anyMatch(s -> {
+  if (s instanceof BaseFileWithLogsSplit) {
+BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+if (bs.getBelongToIncrementalSplit()) {
+  return true;
+}
+  } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+return true;
+  }
+  return false;
+});
+  }
+
+  // Pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   sorry, will update。




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-26 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r736332305



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,46 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   good, let me try it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-21 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r733521589



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,46 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   The key point is not the number of commits, the key point is that at 
commit3 file3 replace the file1 and file2。 we can set startcommit a little 
smller than commit2 and max commit is 1.   
   set hoodie.testlog.consume.max.commits=1
   set hoodie.testlog.consume.start.timestamp=commit2-1  then commit2 will be 
picked




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-21 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r733458463



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,46 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   @danny0405 
   I remembered why I did this.
   
   we cannot read incremental datas which before replacecommit.
   
   line 122, return latest fileSlices
   
   line 134 we use those latest fileSlices to filter the queried inputslit.
   
   think that now our table has 3 commits:
   commit1file:  (file1)
   commit2file:  (file2)
   replacecommit  file: (file3)
   
   now we want to query the incremental data of commit2 (file2 will be picked 
as inputSplit);
   
   line122, will give us a wrong fileSlices,  (this fileSlices only contains 
file3) 
   line 134,  when we do filter by using those fileSlices, all the incremental 
inputSplit will be exclueded.  since incremental inputSplit is file2, but 
fileSlices only contains file3. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-20 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r732502115



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,46 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   @danny0405 thanks。 i will try and update the code。




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-14 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r728890684



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,51 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,
+  List rtSplits,
+  final Option finalHoodieVirtualKeyInfo) {
+return fileSplitList.stream().filter(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+  // filter the snapshot split.
+  if (s instanceof RealtimeBootstrapBaseFileSplit) {
+return false;
+  } else if ((s instanceof BaseFileWithLogsSplit) && 
((BaseFileWithLogsSplit) s).getBelongToIncrementalSplit()) {

Review comment:
   i just want to split the logical of incremental query and snapshot 
query.ok   i will  change  it  ,thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-14 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r728743555



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -119,6 +307,11 @@ void addProjectionToJobConf(final RealtimeSplit 
realtimeSplit, final JobConf job
 addProjectionToJobConf(realtimeSplit, jobConf);
 LOG.info("Creating record reader with readCols :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
 + ", Ids :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+
+// for log only split, we no need parquet reader, set it to empty
+if (FSUtils.isLogFile(realtimeSplit.getPath())) {

Review comment:
   agree

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -69,17 +70,18 @@
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
   public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
-Map> partitionsToParquetSplits =
-fileSplits.collect(Collectors.groupingBy(split -> 
split.getPath().getParent()));
+// for all unique split parents, obtain all delta files based on delta 
commit timeline,
+// grouped on file id
+List rtSplits = new ArrayList<>();
+List candidateFileSplits = 
fileSplits.collect(Collectors.toList());
+Map> partitionsToParquetSplits = candidateFileSplits

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-14 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r728743157



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieEmptyRecordReader.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+/**
+ * dummy record for log only realtime split.
+ */

Review comment:
   agree
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-14 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r728741908



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -66,6 +90,170 @@
 return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * Keep the logical of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).

Review comment:
   agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-12 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r726838452



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -99,7 +101,32 @@
 }
   }
   Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
-  partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+  // deal with incremental query
+  candidateFileSplits.stream().forEach(s -> {
+try {

Review comment:
   thanks, if we can do it in one iteration, the logical of deal with 
incremental query and the logical of deal with snapshot query are mixed 
together,  are you sure we need to do that?
   ok, i try to do it in one iteration。 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-11 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r726802648



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseFileWithLogsSplit extends FileSplit {

Review comment:
   agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-11 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r726732125



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we need to encode additional information in split to track matching log 
file and base files.

Review comment:
   This is a bridge class,all BaseFileWithLogSplit will convert to 
HoodieRealtimeFileSplit finally。  i think we should not add too many things in 
HoodieRealtimeFileSplit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-11 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r726731332



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we need to encode additional information in split to track matching log 
file and base files.
+ * Hence, this weird looking class which tracks an log/base file split
+ */
+public class BaseFileWithLogsSplit extends FileSplit {
+  // a flag to mark this split is produced by incremental query or not.
+  private boolean belongToIncrementalSplit = false;

Review comment:
   agree, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-10 Thread GitBox


xiarixiaoyao commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r725781031



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -66,6 +91,139 @@
 return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * keep the logical of mor_incr_view as same as spark datasource.
+   * to do: unify the incremental view code between hive/spark-sql and spark 
datasource
+   */
+  @Override
+  protected List listStatusForIncrementalMode(
+  JobConf job, HoodieTableMetaClient tableMetaClient, List 
inputPaths) throws IOException {
+List result = new ArrayList<>();
+String tableName = tableMetaClient.getTableConfig().getTableName();
+Job jobContext = Job.getInstance(job);
+
+Option timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+if (!timeline.isPresent()) {
+  return result;
+}
+String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(jobContext, 
tableName);
+// Total number of commits to return in this batch. Set this to -1 to get 
all the commits.
+Integer maxCommits = HoodieHiveUtils.readMaxCommits(jobContext, tableName);
+HoodieTimeline commitsTimelineToReturn = 
timeline.get().findInstantsAfter(lastIncrementalTs, maxCommits);
+Option> commitsToCheck = 
Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList()));
+if (!commitsToCheck.isPresent()) {
+  return result;
+}
+Map> partitionsWithFileStatus  = 
HoodieInputFormatUtils
+.listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), 
commitsToCheck.get(), commitsTimelineToReturn);
+// build fileGroup from fsView
+List affectedFileStatus = new ArrayList<>();
+partitionsWithFileStatus.forEach((key, value) -> value.forEach((k, v) -> 
affectedFileStatus.add(v)));
+HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, 
affectedFileStatus.toArray(new FileStatus[0]));
+// build fileGroup from fsView
+String basePath = tableMetaClient.getBasePath();
+// filter affectedPartition by inputPaths
+List affectedPartition = partitionsWithFileStatus.keySet().stream()
+.filter(k -> k.isEmpty() ? inputPaths.contains(new Path(basePath)) : 
inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList());
+if (affectedPartition.isEmpty()) {
+  return result;
+}
+List fileGroups = affectedPartition.stream()
+.flatMap(partitionPath -> 
fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList());
+setInputPaths(job, affectedPartition.stream()
+.map(p -> p.isEmpty() ? basePath : new Path(basePath, 
p).toUri().toString()).collect(Collectors.joining(",")));
+
+// find all file status in current partitionPath
+FileStatus[] fileStatuses = getStatus(job);
+Map candidateFileStatus = new HashMap<>();
+for (int i = 0; i < fileStatuses.length; i++) {
+  String key = fileStatuses[i].getPath().toString();
+  candidateFileStatus.put(key, fileStatuses[i]);
+}
+
+String maxCommitTime = fsView.getLastInstant().get().getTimestamp();
+fileGroups.stream().forEach(f -> {
+  try {
+List baseFiles = f.getAllFileSlices().filter(slice -> 
slice.getBaseFile().isPresent()).collect(Collectors.toList());
+if (!baseFiles.isEmpty()) {
+  FileStatus baseFileStatus = 
HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get());
+  String baseFilePath = baseFileStatus.getPath().toUri().toString();
+  if (!candidateFileStatus.containsKey(baseFilePath)) {
+throw new HoodieException("Error obtaining fileStatus for file: " 
+ baseFilePath);
+  }
+  RealtimeFileStatus fileStatus = new 
RealtimeFileStatus(candidateFileStatus.get(baseFilePath));
+  fileStatus.setMaxCommitTime(maxCommitTime);
+  fileStatus.setBelongToIncrementalFileStatus(true);
+  fileStatus.setBasePath(basePath);
+  fileStatus.setBaseFilePath(baseFilePath);
+  
fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> 
l.getPath().toString()).collect(Collectors.toList()));

Review comment:
 // Ensure get the base file when there is a pending compaction, 
which means the base file
 // won't be in the latest file slice.
   see the doc of MergeOnReadIncrementalRelation 
https://github.com/apache/hudi/blob/ceace1c653a3ce3c97e6ee5a244d71ff1806be4f/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala#L203




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For querie