[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-27 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r737998861



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +148,110 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // get IncrementalRealtimeSplits
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
+List rtSplits = new ArrayList<>();
+List fileSplitList = fileSplits.collect(Collectors.toList());
+Set partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+Map partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
+// Pre process tableConfig from first partition to fetch virtual key info
+Option hoodieVirtualKeyInfo = Option.empty();
+if (partitionSet.size() > 0) {
+  hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
+}
+Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
+fileSplitList.stream().forEach(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+});
+LOG.info("Returning a total splits of " + rtSplits.size());
+return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  public static Option 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+HoodieTableConfig tableConfig = metaClient.getTableConfig();
+if (!tableConfig.populateMetaFields()) {
+  TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+  try {
+MessageType parquetSchema = 
tableSchemaResolver.getTableParquetSchema();
+return Option.of(new 
HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
+tableConfig.getPartitionFieldProp(), 
parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
+parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(;
+  } catch (Exception exception) {
+throw new HoodieException("Fetching table schema failed with exception 
", exception);
+  }
+}
+return Option.empty();
+  }
+
+  public static boolean isIncrementalQuerySplits(List fileSplits) {
+if (fileSplits == null || fileSplits.size() == 0) {
+  return false;
+}
+return fileSplits.stream().anyMatch(s -> {
+  if (s instanceof BaseFileWithLogsSplit) {
+BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+if (bs.getBelongToIncrementalSplit()) {
+  return true;
+}
+  } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+return true;
+  }
+  return false;
+});
+  }
+
+  // Pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   The code is useless now: `#filterOutIncrementalSplits`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-27 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r737998688



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +148,110 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // get IncrementalRealtimeSplits
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
+List rtSplits = new ArrayList<>();
+List fileSplitList = fileSplits.collect(Collectors.toList());
+Set partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+Map partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
+// Pre process tableConfig from first partition to fetch virtual key info
+Option hoodieVirtualKeyInfo = Option.empty();
+if (partitionSet.size() > 0) {
+  hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
+}
+Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
+fileSplitList.stream().forEach(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+});
+LOG.info("Returning a total splits of " + rtSplits.size());
+return rtSplits.toArray(new InputSplit[0]);
+  }
+
+  public static Option 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+HoodieTableConfig tableConfig = metaClient.getTableConfig();
+if (!tableConfig.populateMetaFields()) {
+  TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+  try {
+MessageType parquetSchema = 
tableSchemaResolver.getTableParquetSchema();
+return Option.of(new 
HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(),
+tableConfig.getPartitionFieldProp(), 
parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()),
+parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp(;
+  } catch (Exception exception) {
+throw new HoodieException("Fetching table schema failed with exception 
", exception);
+  }
+}
+return Option.empty();
+  }
+
+  public static boolean isIncrementalQuerySplits(List fileSplits) {
+if (fileSplits == null || fileSplits.size() == 0) {
+  return false;
+}
+return fileSplits.stream().anyMatch(s -> {
+  if (s instanceof BaseFileWithLogsSplit) {

Review comment:
   Can be simplified as:
   ```java
   return fileSplits.stream().anyMatch(s -> {
 if (s instanceof BaseFileWithLogsSplit) {
   BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
   return bs.getBelongToIncrementalSplit();
 } else {
   return s instanceof RealtimeBootstrapBaseFileSplit;
 }
   });
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-25 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r736128496



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,46 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   Thanks, got your idea, the input param `fileSplits` comes from the 
scanning of Hive input format, so it should include all the files on the query 
path because `HoodieParquetRealtimeInputFormat#collectAllIncrementalFiles` 
generates the input splits from all the file slices.
   
   And the incremental query use the same code path here with the snapshot 
query, which complicates the code, maybe we can split the incremental query 
code out of the method `getRealtimeSplits` into a new method 
`getIncrementalRealtimeSplits` ? WDYT :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-21 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r733479600



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,46 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   > now we want to query the incremental data of commit2 (file2 will be 
picked as inputSplit);
   
   What is the start commit time of the  incremental query ? If the start time 
is commit2, i guess the incremental commit is not commit2(or file2) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-19 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r731567300



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,46 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,

Review comment:
   I still think there is no need to handle the incremental splits first, 
can we just merge the handling into the line 139 ~ line 148, and logic for 
`BaseFileWithLogsSplit` can be reused ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-14 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r728830538



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -161,6 +162,51 @@
 return rtSplits.toArray(new InputSplit[0]);
   }
 
+  // pick all incremental files and add them to rtSplits then filter out those 
files.
+  private static Map> filterOutIncrementalSplits(
+  List fileSplitList,
+  List rtSplits,
+  final Option finalHoodieVirtualKeyInfo) {
+return fileSplitList.stream().filter(s -> {
+  // deal with incremental query.
+  try {
+if (s instanceof BaseFileWithLogsSplit) {
+  BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
+  if (bs.getBelongToIncrementalSplit()) {
+rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
+  }
+} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
+  rtSplits.add(s);
+}
+  } catch (IOException e) {
+throw new HoodieIOException("Error creating hoodie real time split ", 
e);
+  }
+  // filter the snapshot split.
+  if (s instanceof RealtimeBootstrapBaseFileSplit) {
+return false;
+  } else if ((s instanceof BaseFileWithLogsSplit) && 
((BaseFileWithLogsSplit) s).getBelongToIncrementalSplit()) {

Review comment:
   Why not just return early, i have pasted the code. And Why we need to 
handle the incremental query first, can we handle them together with snapshot 
query ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-14 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r728695624



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -69,17 +70,18 @@
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
   public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream fileSplits) throws IOException {
-Map> partitionsToParquetSplits =
-fileSplits.collect(Collectors.groupingBy(split -> 
split.getPath().getParent()));
+// for all unique split parents, obtain all delta files based on delta 
commit timeline,
+// grouped on file id
+List rtSplits = new ArrayList<>();
+List candidateFileSplits = 
fileSplits.collect(Collectors.toList());
+Map> partitionsToParquetSplits = candidateFileSplits

Review comment:
   `candidateFileSplits` => `fileSplitList`

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -119,6 +307,11 @@ void addProjectionToJobConf(final RealtimeSplit 
realtimeSplit, final JobConf job
 addProjectionToJobConf(realtimeSplit, jobConf);
 LOG.info("Creating record reader with readCols :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
 + ", Ids :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+
+// for log only split, we no need parquet reader, set it to empty
+if (FSUtils.isLogFile(realtimeSplit.getPath())) {

Review comment:
   `we no need parquet reader, set it to empty` => `set the parquet reader 
as empty`

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieEmptyRecordReader.java
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+/**
+ * dummy record for log only realtime split.
+ */

Review comment:
   `dummy record` => `Dummy record reader`

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -99,7 +101,32 @@
 }
   }
   Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
-  partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+  // deal with incremental query
+  candidateFileSplits.stream().forEach(s -> {
+try {

Review comment:
   Something like this:
   ```java
 private static Map> filterOutIncrementalSplits(
 List fileSplitList,
 List rtSplits,
 final Option finalHoodieVirtualKeyInfo) {
   return fileSplitList.stream().filter(s -> {
 // deal with incremental query.
 try {
   if (s instanceof BaseFileWithLogsSplit) {
 BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
 if (bs.getBelongToIncrementalSplit()) {
   rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
   return false;
 }
   } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
 rtSplits.add(s);
 return false;
   }
 } catch (IOException e) {
   throw new HoodieIOException("Error creating hoodie real time split 
", e);
 }
 return true;
   }).collect(Collectors.groupingBy(split -> split.getPath().getParent()));
 }
   ```

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##
@@ -99,7 +101,32 @@
 }
   }
   Option finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
-  partitionsToParquetSplits.keySet().forEach(partitionPath -> {
+  // deal with incremental query
+  candidateFileSplits.stream().forEach(s -> {
+try {

Review comment:
   

[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-13 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r728651340



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
##
@@ -66,6 +90,170 @@
 return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
   }
 
+  /**
+   * Keep the logical of mor_incr_view as same as spark datasource.
+   * Step1: Get list of commits to be fetched based on start commit and max 
commits(for snapshot max commits is -1).

Review comment:
   `logical` is an adjective, please use noun `logic` instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-10 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r725764091



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * we need to encode additional information in Path to track matching log file 
and base files.
+ * Hence, this weird looking class which tracks an log/base file status
+ */
+public class PathWithLogFilePath extends Path {
+  // a flag to mark this split is produced by incremental query or not.
+  private boolean belongToIncrementalPath = false;
+  // the log files belong this path.
+  private List deltaLogPaths = new ArrayList<>();
+  // max commit time of current path.
+  private String maxCommitTime = "";
+  // the basePath of current hoodie table.
+  private String basePath = "";
+  // the base file belong to this path;
+  private String baseFilePath = "";

Review comment:
   the base file path belong to this path.

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseFileWithLogsSplit extends FileSplit {

Review comment:
   `We need to encode additional information in split to track matching 
base and log files. Hence, this weird looking class which tracks a log/base 
file split`

##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##
@@ -102,97 +102,107 @@ private HoodieMergedLogRecordScanner 
getMergedLogRecordScanner() throws IOExcept
 
   @Override
   public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws 
IOException {
+// deal with DeltaOnlySplits
+if (logReader.isPresent()) {
+  return logReader.get().next(aVoid, arrayWritable);
+}
 // Call the underlying parquetReader.next - which may replace the passed 
in ArrayWritable
 // with a new block of values
-while (this.parquetReader.next(aVoid, arrayWritable)) {
-  if (!deltaRecordMap.isEmpty()) {
-String key = arrayWritable.get()[recordKeyIndex].toString();
-if (deltaRecordMap.containsKey(key)) {
-  // mark the key as handled
-  this.deltaRecordKeys.remove(key);
-  // TODO(NA): Invoke preCombine here by converting arrayWritable to 
Avro. This is required since the
-  // deltaRecord may not be a full record and needs values of columns 
from the parquet
-  Option rec = 
buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
-  // If the record is not present, this is a delete record using an 
empty payload so skip this base record
-  // and move to the next record
-  if (!rec.isPresent()) {
-continue;
+boolean result = this.parquetReader.next(aVoid, arrayWritable);
+if (!result) {
+  // if the result is false, then there are 

[GitHub] [hudi] danny0405 commented on a change in pull request #3203: [HUDI-2086] Refactor hive mor_incremental_view

2021-10-10 Thread GitBox


danny0405 commented on a change in pull request #3203:
URL: https://github.com/apache/hudi/pull/3203#discussion_r725759641



##
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseFileWithLogsSplit extends FileSplit {

Review comment:
   All the comments should start with uppercase character: `we => We` then 
end with dot `.`.
   Please fix all of the left comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org