Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-31 Thread via GitHub


yihua merged PR #9883:
URL: https://github.com/apache/hudi/pull/9883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-29 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1784519715

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 5a7c85e5f7fe7cc56f202a4174492e9e630b7a0b Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20552)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-29 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1784482430

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 69bed1eb6e45fda4b2380ae16f78004968570781 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20538)
 
   * 5a7c85e5f7fe7cc56f202a4174492e9e630b7a0b UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-29 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375668820


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##
@@ -43,9 +44,27 @@ public interface HoodieRecordMerger extends Serializable {
* This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload.
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to 
yield the same result)
+   * This method takes only full records for merging.
*/
   Option> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException;
 
+  /**
+   * Merges records which can contain partial updates, i.e., only subset of 
fields and values are
+   * present in the record representing the updates, and absent fields are not 
updated.

Review Comment:
   Examples added in docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-29 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375616920


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##
@@ -43,9 +44,27 @@ public interface HoodieRecordMerger extends Serializable {
* This method converges combineAndGetUpdateValue and precombine from 
HoodiePayload.
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to 
yield the same result)
+   * This method takes only full records for merging.
*/
   Option> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException;
 
+  /**
+   * Merges records which can contain partial updates, i.e., only subset of 
fields and values are
+   * present in the record representing the updates, and absent fields are not 
updated.

Review Comment:
   Maybe a simple demo to demonstrate the update behaviors, like what we do in 
`PartialUpdateAvroPayload`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-28 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1783735545

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 69bed1eb6e45fda4b2380ae16f78004968570781 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20538)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-28 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1783725463

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 03c759528edb8ce1e0997b5b98921fa6343f1f22 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20535)
 
   * 69bed1eb6e45fda4b2380ae16f78004968570781 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-28 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375179496


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map, Schema>,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param olderOlder {@link HoodieSparkRecord}.
+   * @param oldSchemaOld schema.
+   * @param newerNewer {@link HoodieSparkRecord}.
+   * @param newSchemaNew schema.
+   * @param readerSchema Reader schema containing all the fields to read.
+   * @param propsConfiguration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
Schema readerSchema,
+ 
TypedProperties props) {
+Pair, Pair> mergedSchemaPair 
=
+getCachedMergedSchema(oldSchema, newSchema, readerSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mergedSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mergedSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+Integer ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+if (ordInPartialUpdate != null) {
+  // pick from new
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mergedSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mergedSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-28 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375179496


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map, Schema>,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param olderOlder {@link HoodieSparkRecord}.
+   * @param oldSchemaOld schema.
+   * @param newerNewer {@link HoodieSparkRecord}.
+   * @param newSchemaNew schema.
+   * @param readerSchema Reader schema containing all the fields to read.
+   * @param propsConfiguration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
Schema readerSchema,
+ 
TypedProperties props) {
+Pair, Pair> mergedSchemaPair 
=
+getCachedMergedSchema(oldSchema, newSchema, readerSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mergedSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mergedSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+Integer ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+if (ordInPartialUpdate != null) {
+  // pick from new
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mergedSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mergedSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375179372


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map, Schema>,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param olderOlder {@link HoodieSparkRecord}.
+   * @param oldSchemaOld schema.
+   * @param newerNewer {@link HoodieSparkRecord}.
+   * @param newSchemaNew schema.
+   * @param readerSchema Reader schema containing all the fields to read.
+   * @param propsConfiguration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
Schema readerSchema,
+ 
TypedProperties props) {
+Pair, Pair> mergedSchemaPair 
=
+getCachedMergedSchema(oldSchema, newSchema, readerSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mergedSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mergedSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+Integer ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+if (ordInPartialUpdate != null) {
+  // pick from new
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mergedSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mergedSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375179246


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java:
##
@@ -84,6 +79,10 @@ public void processDataBlock(HoodieDataBlock dataBlock, 
Option keySpecO
   isFullKey = keySpecOpt.get().isFullKey();
 }
 
+if (dataBlock.containsPartialUpdates()) {
+  enablePartialMerging = true;
+}

Review Comment:
   The buffer consumes multiple log blocks in the file group.  When there are 
partial updates in one log block, we switch the merging mode and do not switch 
back going forward to guarantee correct merging results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375179085


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##
@@ -46,6 +48,9 @@ public interface HoodieRecordMerger extends Serializable {
*/
   Option> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException;
 
+  default Option> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
+throw new HoodieException("Partial merging logic is not implemented.");

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375178992


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##
@@ -440,7 +443,9 @@ private static void validateRow(InternalRow data, 
StructType schema) {
 //   corresponding schema has to be provided as well so that it could 
be properly
 //   serialized (in case it would need to be)
 boolean isValid = data == null || data instanceof UnsafeRow
-|| schema != null && (data instanceof HoodieInternalRow || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+|| schema != null && (data instanceof HoodieInternalRow
+|| data instanceof GenericInternalRow

Review Comment:
   The line is too long.  I've revised the indentation to make it more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375178539


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##
@@ -76,6 +77,45 @@ public Option> merge(HoodieRecord 
older, Schema oldSc
 }
   }
 
+  @Override
+  public Option> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
+ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
+ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
+
+if (newer instanceof HoodieSparkRecord) {
+  HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+  if (newSparkRecord.isDeleted()) {
+// Delete record
+return Option.empty();
+  }
+} else {
+  if (newer.getData() == null) {
+// Delete record
+return Option.empty();
+  }
+}
+
+if (older instanceof HoodieSparkRecord) {
+  HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
+  if (oldSparkRecord.isDeleted()) {
+// use natural order for delete record
+return Option.of(Pair.of(newer, newSchema));
+  }
+} else {
+  if (older.getData() == null) {
+// use natural order for delete record
+return Option.of(Pair.of(newer, newSchema));
+  }
+}
+if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+  return Option.of(SparkRecordMergingUtils.mergeCompleteOrPartialRecords(
+  (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, readerSchema, props));

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375178419


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());

Review Comment:
   A new `partialMerge` API is created so that partial merging logic is not 
called if not necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375178329


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mappingSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mappingSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public static Map 
getCachedFieldIdToFieldMapping(Schema avroSchema) {
+return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(avroSchema, schema 
-> {
+  StructType structType = 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375178224


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##
@@ -51,6 +61,28 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
  requiredSchema: Schema,
  conf: Configuration): 
ClosableIterator[InternalRow] = {
 val fileInfo = 
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
 filePath, start, length)
-new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+if (filePath.toString.contains(HoodieLogFile.DELTA_EXTENSION)) {

Review Comment:
   `FsUtils.isLogFile` is fixed by considering `inlinefs://` scheme.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375161773


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map, Schema>,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param olderOlder {@link HoodieSparkRecord}.
+   * @param oldSchemaOld schema.
+   * @param newerNewer {@link HoodieSparkRecord}.
+   * @param newSchemaNew schema.
+   * @param readerSchema Reader schema containing all the fields to read.
+   * @param propsConfiguration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
Schema readerSchema,
+ 
TypedProperties props) {
+Pair, Pair> mergedSchemaPair 
=
+getCachedMergedSchema(oldSchema, newSchema, readerSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mergedSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mergedSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+Integer ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+if (ordInPartialUpdate != null) {
+  // pick from new
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mergedSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mergedSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375161288


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map, Schema>,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param olderOlder {@link HoodieSparkRecord}.
+   * @param oldSchemaOld schema.
+   * @param newerNewer {@link HoodieSparkRecord}.
+   * @param newSchemaNew schema.
+   * @param readerSchema Reader schema containing all the fields to read.
+   * @param propsConfiguration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
Schema readerSchema,
+ 
TypedProperties props) {
+Pair, Pair> mergedSchemaPair 
=
+getCachedMergedSchema(oldSchema, newSchema, readerSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mergedSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mergedSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+Integer ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+if (ordInPartialUpdate != null) {
+  // pick from new
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mergedSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mergedSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375160946


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java:
##
@@ -84,6 +79,10 @@ public void processDataBlock(HoodieDataBlock dataBlock, 
Option keySpecO
   isFullKey = keySpecOpt.get().isFullKey();
 }
 
+if (dataBlock.containsPartialUpdates()) {
+  enablePartialMerging = true;
+}

Review Comment:
   Is there any chance this flag switch back to false? Can the buffer consumes 
multiple log blocks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375160672


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##
@@ -46,6 +48,9 @@ public interface HoodieRecordMerger extends Serializable {
*/
   Option> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException;
 
+  default Option> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
+throw new HoodieException("Partial merging logic is not implemented.");

Review Comment:
   throws `UnSupportedException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375159973


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##
@@ -440,7 +443,9 @@ private static void validateRow(InternalRow data, 
StructType schema) {
 //   corresponding schema has to be provided as well so that it could 
be properly
 //   serialized (in case it would need to be)
 boolean isValid = data == null || data instanceof UnsafeRow
-|| schema != null && (data instanceof HoodieInternalRow || 
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+|| schema != null && (data instanceof HoodieInternalRow
+|| data instanceof GenericInternalRow

Review Comment:
   The original indentation is more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1375159611


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##
@@ -76,6 +77,45 @@ public Option> merge(HoodieRecord 
older, Schema oldSc
 }
   }
 
+  @Override
+  public Option> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
+ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
+ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
+
+if (newer instanceof HoodieSparkRecord) {
+  HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+  if (newSparkRecord.isDeleted()) {
+// Delete record
+return Option.empty();
+  }
+} else {
+  if (newer.getData() == null) {
+// Delete record
+return Option.empty();
+  }
+}
+
+if (older instanceof HoodieSparkRecord) {
+  HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
+  if (oldSparkRecord.isDeleted()) {
+// use natural order for delete record
+return Option.of(Pair.of(newer, newSchema));
+  }
+} else {
+  if (older.getData() == null) {
+// use natural order for delete record
+return Option.of(Pair.of(newer, newSchema));
+  }
+}
+if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+  return Option.of(SparkRecordMergingUtils.mergeCompleteOrPartialRecords(
+  (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, readerSchema, props));

Review Comment:
   We can rename the method back to `mergePartialRecords`now ~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1783686118

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 03c759528edb8ce1e0997b5b98921fa6343f1f22 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20535)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-27 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1783684492

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 5b39c6dc67dbe4273bfcc704f644c925e76f8fa2 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20523)
 
   * 03c759528edb8ce1e0997b5b98921fa6343f1f22 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1782250406

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 5b39c6dc67dbe4273bfcc704f644c925e76f8fa2 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20523)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1782196487

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 10f2ad7f6615a95666204ff834bff39b6e3f96ef Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20505)
 
   * 5b39c6dc67dbe4273bfcc704f644c925e76f8fa2 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20523)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1782167419

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 10f2ad7f6615a95666204ff834bff39b6e3f96ef Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20505)
 
   * 5b39c6dc67dbe4273bfcc704f644c925e76f8fa2 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1373981901


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1373980774


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));

Review Comment:
   Here `new` updates `old` after determining the ordering, either based on 
commit, event time, or custom logic. Both old and new schema can be partial, so 
there can be intermediate partial updates.  For example, consider the following 
case:
   ```
   base file: schema {col1, col2, col3, col4}
   log1: partial update schema {col2}
   log2: partial update schema {col4}
   ```
   When merging partial update records between log1 (`{col2}`) and log2 
(`{col4}`), the log reader generates partial updates again with schema `{col2, 
col4}` for some records.  Finally there's record merging between base (`{col1, 
col2, col3, col4}`) and logs (`{col2, col4}`). 



-- 
This is an automated 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1373978234


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mappingSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mappingSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public static Map 
getCachedFieldIdToFieldMapping(Schema avroSchema) {
+return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(avroSchema, schema 
-> {
+  StructType structType = 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1373977542


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mappingSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mappingSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public static Map 
getCachedFieldIdToFieldMapping(Schema avroSchema) {
+return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(avroSchema, schema 
-> {
+  StructType structType = 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780949936

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 10f2ad7f6615a95666204ff834bff39b6e3f96ef Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20505)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780794969

   
   ## CI report:
   
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 48be011f64ef12129a0af429f247a036e373dd0f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20502)
 
   * 10f2ad7f6615a95666204ff834bff39b6e3f96ef Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20505)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780764566

   
   ## CI report:
   
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20500)
 
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 48be011f64ef12129a0af429f247a036e373dd0f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20502)
 
   * 10f2ad7f6615a95666204ff834bff39b6e3f96ef Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20505)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372839566


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));

Review Comment:
   I'm confused what is the old schema, is it a full table schema, and why the 
new schema fiels can be tailed to the end of the old schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372831131


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());

Review Comment:
   Using `get` then decides val != null is always more efficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372826782


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());

Review Comment:
   We can move the `isPartial` to the first line so that there is no need to 
compute the cache for regular write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372822717


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mappingSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mappingSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public static Map 
getCachedFieldIdToFieldMapping(Schema avroSchema) {
+return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(avroSchema, schema 
-> {
+  StructType structType = 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372820250


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be plugged into any Spark {@link HoodieRecordMerger} 
implementation.
+ */
+public class SparkRecordMergingUtils {
+  private static final Map> 
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map> 
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+  private static final Map,
+  Pair, Pair>> 
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+  /**
+   * Merges records which can contain partial updates.
+   *
+   * @param older Older {@link HoodieSparkRecord}.
+   * @param oldSchema Old schema.
+   * @param newer Newer {@link HoodieSparkRecord}.
+   * @param newSchema New schema.
+   * @param props Configuration in {@link TypedProperties}.
+   * @return The merged record.
+   */
+  public static Pair 
mergeCompleteOrPartialRecords(HoodieSparkRecord older,
+ 
Schema oldSchema,
+ 
HoodieSparkRecord newer,
+ 
Schema newSchema,
+ 
TypedProperties props) {
+Pair, Pair> 
mappingSchemaPair =
+getCachedMergedSchema(oldSchema, newSchema);
+boolean isNewerPartial = isPartial(newSchema, 
mappingSchemaPair.getRight().getRight());
+if (isNewerPartial) {
+  InternalRow oldRow = older.getData();
+  InternalRow newPartialRow = newer.getData();
+
+  Map mergedIdToFieldMapping = 
mappingSchemaPair.getLeft();
+  Map newPartialNameToIdMapping = 
getCachedFieldNameToIdMapping(newSchema);
+  List values = new ArrayList<>(mergedIdToFieldMapping.size());
+  for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size(); 
fieldId++) {
+StructField structField = mergedIdToFieldMapping.get(fieldId);
+if (newPartialNameToIdMapping.containsKey(structField.name())) {
+  // pick from new
+  int ordInPartialUpdate = 
newPartialNameToIdMapping.get(structField.name());
+  values.add(newPartialRow.get(ordInPartialUpdate, 
structField.dataType()));
+} else {
+  // pick from old
+  values.add(oldRow.get(fieldId, structField.dataType()));
+}
+  }
+  InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+  HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+  mergedRow, mappingSchemaPair.getRight().getLeft());
+  return Pair.of(mergedSparkRecord, 
mappingSchemaPair.getRight().getRight());
+} else {
+  return Pair.of(newer, newSchema);
+}
+  }
+
+  /**
+   * @param avroSchema Avro schema.
+   * @return The field ID to {@link StructField} instance mapping.
+   */
+  public static Map 
getCachedFieldIdToFieldMapping(Schema avroSchema) {
+return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(avroSchema, schema 
-> {
+  StructType structType = 

Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780692377

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20500)
 
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 48be011f64ef12129a0af429f247a036e373dd0f Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20502)
 
   * 10f2ad7f6615a95666204ff834bff39b6e3f96ef Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20505)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780676486

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20500)
 
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 48be011f64ef12129a0af429f247a036e373dd0f Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20502)
 
   * 10f2ad7f6615a95666204ff834bff39b6e3f96ef UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780663069

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20500)
 
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 48be011f64ef12129a0af429f247a036e373dd0f Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20502)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372700756


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java:
##
@@ -127,10 +124,12 @@ public boolean hasNext() throws IOException {
 
   String recordKey = readerContext.getRecordKey(baseRecord, 
baseFileSchema);
   Pair, Map> logRecordInfo = 
records.remove(recordKey);
+  Map metadata = readerContext.generateMetadataForRecord(
+  baseRecord, baseFileSchema, false);

Review Comment:
   We'll do benchmarking on this.  Likely it's ok given we're not doing much 
work inside the method and the existing log reader also extract some metadata 
for each record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780593944

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20500)
 
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   * 48be011f64ef12129a0af429f247a036e373dd0f UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780580512

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20500)
 
   * 78480b553a25033b4f642518c92402e6acd181b4 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372707069


##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##
@@ -70,9 +71,11 @@ public Option> merge(HoodieRecord 
older, Schema oldSc
   }
 }
 if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
-  return Option.of(Pair.of(older, oldSchema));
+  return Option.of(SparkPartialMergingUtils.mergePartialRecords(
+  (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, props));

Review Comment:
   Makes sense.  Renamed to 
`SparkRecordMergingUtils#mergeCompleteOrPartialRecords`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372700756


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java:
##
@@ -127,10 +124,12 @@ public boolean hasNext() throws IOException {
 
   String recordKey = readerContext.getRecordKey(baseRecord, 
baseFileSchema);
   Pair, Map> logRecordInfo = 
records.remove(recordKey);
+  Map metadata = readerContext.generateMetadataForRecord(
+  baseRecord, baseFileSchema, false);

Review Comment:
   We'll do benchmarking on this.  Likely it's ok given we're not doing much 
work inside the method and the existing also extract some metadata for each 
record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372698435


##
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##
@@ -160,9 +169,11 @@ public Map generateMetadataForRecord(
* @param schema The Avro schema of the record.
* @return A mapping containing the metadata.
*/
-  public Map generateMetadataForRecord(T record, Schema 
schema) {
+  public Map generateMetadataForRecord(T record, Schema 
schema, boolean isPartial) {
 Map meta = new HashMap<>();
 meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
+meta.put(INTERNAL_META_SCHEMA, schema);
+meta.put(INTERNAL_META_IS_PARTIAL, isPartial);

Review Comment:
   We can.  We can take it up in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-26 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372608672


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##
@@ -94,16 +94,17 @@ public Comparable getOrderingValue(Option 
rowOption,
 
   @Override
   public HoodieRecord constructHoodieRecord(Option 
rowOption,
- Map 
metadataMap,
- Schema schema) {
+ Map 
metadataMap) {
 if (!rowOption.isPresent()) {
   return new HoodieEmptyRecord<>(
   new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
   (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
   HoodieRecord.HoodieRecordType.SPARK);
 }
 
+Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
 InternalRow row = rowOption.get();
+boolean isPartial = (boolean) 
metadataMap.getOrDefault(INTERNAL_META_IS_PARTIAL, false);
 return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));

Review Comment:
   Good catch.  Somehow I missed this.  Fixed now.



##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##
@@ -151,7 +151,7 @@ public HoodieFileGroupReader(HoodieReaderContext 
readerContext,
   public void initRecordIterators() {
 this.baseFileIterator = baseFilePath.isPresent()
 ? readerContext.getFileRecordIterator(
-baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
+baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)

Review Comment:
   Fixed.



##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##
@@ -51,6 +61,28 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
  requiredSchema: Schema,
  conf: Configuration): 
ClosableIterator[InternalRow] = {
 val fileInfo = 
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
 filePath, start, length)
-new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+if (filePath.toString.contains(HoodieLogFile.DELTA_EXTENSION)) {

Review Comment:
   This file path can be an InlineFS URL like 
`inlinefs://path/h3/.80646032-a18e-444f-a7ff-ae518cea8bdb-0_20231026055441030.log.1_0-257-368/file/?start_offset=672=2669`,
 which cannot be properly checked by `FsUtils.isLogFile`.  Should I fix 
`FsUtils.isLogFile`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780459956

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20500)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780453442

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   * d5132f11b2cd4fff06c286ef9741dbaa80fa0463 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372535469


##
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##
@@ -160,9 +169,11 @@ public Map generateMetadataForRecord(
* @param schema The Avro schema of the record.
* @return A mapping containing the metadata.
*/
-  public Map generateMetadataForRecord(T record, Schema 
schema) {
+  public Map generateMetadataForRecord(T record, Schema 
schema, boolean isPartial) {
 Map meta = new HashMap<>();
 meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
+meta.put(INTERNAL_META_SCHEMA, schema);
+meta.put(INTERNAL_META_IS_PARTIAL, isPartial);

Review Comment:
   I'm wondering whether we can represent the metadata as a POJO to make the 
interface more explicit and clear.



##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##
@@ -94,16 +94,17 @@ public Comparable getOrderingValue(Option 
rowOption,
 
   @Override
   public HoodieRecord constructHoodieRecord(Option 
rowOption,
- Map 
metadataMap,
- Schema schema) {
+ Map 
metadataMap) {
 if (!rowOption.isPresent()) {
   return new HoodieEmptyRecord<>(
   new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
   (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
   HoodieRecord.HoodieRecordType.SPARK);
 }
 
+Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
 InternalRow row = rowOption.get();
+boolean isPartial = (boolean) 
metadataMap.getOrDefault(INTERNAL_META_IS_PARTIAL, false);
 return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));

Review Comment:
   The `isPartial` is never used.



##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##
@@ -51,6 +61,28 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
  requiredSchema: Schema,
  conf: Configuration): 
ClosableIterator[InternalRow] = {
 val fileInfo = 
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
 filePath, start, length)
-new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+if (filePath.toString.contains(HoodieLogFile.DELTA_EXTENSION)) {

Review Comment:
   Use `FsUtils.isLogFile` instead.



##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##
@@ -151,7 +151,7 @@ public HoodieFileGroupReader(HoodieReaderContext 
readerContext,
   public void initRecordIterators() {
 this.baseFileIterator = baseFilePath.isPresent()
 ? readerContext.getFileRecordIterator(
-baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
+baseFilePath.get().getHadoopPath(), start, length, 
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)

Review Comment:
   Unnecessary change?



##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java:
##
@@ -127,10 +124,12 @@ public boolean hasNext() throws IOException {
 
   String recordKey = readerContext.getRecordKey(baseRecord, 
baseFileSchema);
   Pair, Map> logRecordInfo = 
records.remove(recordKey);
+  Map metadata = readerContext.generateMetadataForRecord(
+  baseRecord, baseFileSchema, false);

Review Comment:
   Caution for the performace regression for per-record metadata construction.



##
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##
@@ -70,9 +71,11 @@ public Option> merge(HoodieRecord 
older, Schema oldSc
   }
 }
 if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
-  return Option.of(Pair.of(older, oldSchema));
+  return Option.of(SparkPartialMergingUtils.mergePartialRecords(
+  (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, props));

Review Comment:
   The partial merge may not happen, so maybe give the utility a better name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780282051

   
   ## CI report:
   
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372442062


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##
@@ -126,12 +128,13 @@ protected Option doProcessNextDataRecord(T record,
   // Merge and store the combined record
   // Note that the incoming `record` is from an older commit, so it should 
be put as
   // the `older` in the merge API
+
   HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(
-  readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
-  readerSchema,
+  readerContext.constructHoodieRecord(Option.of(record), metadata),
+  (Schema) metadata.get(INTERNAL_META_SCHEMA),
   readerContext.constructHoodieRecord(
-  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-  readerSchema,
+  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+  (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
   payloadProps).get().getLeft();

Review Comment:
   To clarify, for reading log files, the reader schema is fetched from the 
header.  Here we're doing record-level merging.  Depending the log file from 
which the records come, the schema could be different.  However, the reference 
to the schema is the same as the schema instance is passed from the log reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780214298

   
   ## CI report:
   
   * 085a8583eb56ff4b8d3afa3636c657b11d0db92f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20491)
 
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20495)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780208856

   
   ## CI report:
   
   * 085a8583eb56ff4b8d3afa3636c657b11d0db92f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20491)
 
   * 6972591365be4bde76c7b41dc5122c63ffd18c79 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780111072

   
   ## CI report:
   
   * 085a8583eb56ff4b8d3afa3636c657b11d0db92f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20491)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780101745

   
   ## CI report:
   
   * 57481f626caf8864def8394c57316535fa490b90 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20490)
 
   * 085a8583eb56ff4b8d3afa3636c657b11d0db92f UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780038520

   
   ## CI report:
   
   * 57481f626caf8864def8394c57316535fa490b90 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20490)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372307509


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -195,6 +206,10 @@ public HoodieKey getKey() {
 return key;
   }
 
+  public boolean isPartial() {
+return isPartial;

Review Comment:
   I removed all changes to the `HoodieRecord` and subclasses.  Now whether a 
record is partial or not is determined by the schema attached, which is per log 
file.  Checking whether a schema is partial or not also leverages cache (see 
`SparkPartialMergingUtils`), so there is no overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1780026429

   
   ## CI report:
   
   * 985e9f099aff341d7d0cec4384ef82b7dcdd4de8 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20469)
 
   * 57481f626caf8864def8394c57316535fa490b90 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-25 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1372304952


##
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##
@@ -67,6 +70,7 @@ public abstract class HoodieReaderContext {
* file.
*
* @param filePath   {@link Path} instance of a file.
+   * @param isLogFile  Whether this is a log file.
* @param start  Starting byte to start reading.

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-24 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1778455957

   
   ## CI report:
   
   * 985e9f099aff341d7d0cec4384ef82b7dcdd4de8 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20469)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-24 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1778352024

   
   ## CI report:
   
   * c140ff462f58b649d45c782ce072b683cd908c1c Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20441)
 
   * 985e9f099aff341d7d0cec4384ef82b7dcdd4de8 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20469)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-24 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1778343885

   
   ## CI report:
   
   * c140ff462f58b649d45c782ce072b683cd908c1c Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20441)
 
   * 985e9f099aff341d7d0cec4384ef82b7dcdd4de8 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1369448954


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -195,6 +206,10 @@ public HoodieKey getKey() {
 return key;
   }
 
+  public boolean isPartial() {
+return isPartial;

Review Comment:
   -1, does not make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1369448429


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##
@@ -126,12 +128,13 @@ protected Option doProcessNextDataRecord(T record,
   // Merge and store the combined record
   // Note that the incoming `record` is from an older commit, so it should 
be put as
   // the `older` in the merge API
+
   HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(
-  readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
-  readerSchema,
+  readerContext.constructHoodieRecord(Option.of(record), metadata),
+  (Schema) metadata.get(INTERNAL_META_SCHEMA),
   readerContext.constructHoodieRecord(
-  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-  readerSchema,
+  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+  (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
   payloadProps).get().getLeft();

Review Comment:
   But it is specific per-file at lest right? Then we can initialize it each 
time ther reader prepares to read a new file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1369448429


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##
@@ -126,12 +128,13 @@ protected Option doProcessNextDataRecord(T record,
   // Merge and store the combined record
   // Note that the incoming `record` is from an older commit, so it should 
be put as
   // the `older` in the merge API
+
   HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(
-  readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
-  readerSchema,
+  readerContext.constructHoodieRecord(Option.of(record), metadata),
+  (Schema) metadata.get(INTERNAL_META_SCHEMA),
   readerContext.constructHoodieRecord(
-  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-  readerSchema,
+  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+  (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
   payloadProps).get().getLeft();

Review Comment:
   But it is specific per-file at lest right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1369126255


##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##
@@ -126,12 +128,13 @@ protected Option doProcessNextDataRecord(T record,
   // Merge and store the combined record
   // Note that the incoming `record` is from an older commit, so it should 
be put as
   // the `older` in the merge API
+
   HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(
-  readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
-  readerSchema,
+  readerContext.constructHoodieRecord(Option.of(record), metadata),
+  (Schema) metadata.get(INTERNAL_META_SCHEMA),
   readerContext.constructHoodieRecord(
-  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-  readerSchema,
+  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+  (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
   payloadProps).get().getLeft();

Review Comment:
   When there are more log files, partial updates, and schema evolution, 
`(Schema) metadata.get(INTERNAL_META_SCHEMA)` can be different across record 
keys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1369119288


##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##
@@ -94,17 +94,18 @@ public Comparable getOrderingValue(Option 
rowOption,
 
   @Override
   public HoodieRecord constructHoodieRecord(Option 
rowOption,
- Map 
metadataMap,
- Schema schema) {
+ Map 
metadataMap) {
 if (!rowOption.isPresent()) {
   return new HoodieEmptyRecord<>(
   new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
   (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
   HoodieRecord.HoodieRecordType.SPARK);
 }
 
+Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
 InternalRow row = rowOption.get();
-return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));
+boolean isPartial = (boolean) 
metadataMap.getOrDefault(INTERNAL_META_IS_PARTIAL, false);
+return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema), isPartial);

Review Comment:
   Reason mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


yihua commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1369118091


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -195,6 +206,10 @@ public HoodieKey getKey() {
 return key;
   }
 
+  public boolean isPartial() {
+return isPartial;

Review Comment:
   `isPartial` is determined at the commit or write batch level, but for record 
merging to work in the current implementation and maintain the layering, it's 
better to have the flag at the record level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


danny0405 commented on code in PR #9883:
URL: https://github.com/apache/hudi/pull/9883#discussion_r1368210408


##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##
@@ -195,6 +206,10 @@ public HoodieKey getKey() {
 return key;
   }
 
+  public boolean isPartial() {
+return isPartial;

Review Comment:
   Not sure why we store `isPartial` flag in row-level, from high level, the 
`isPartial` should at lest to be with glanularity of commit/a write batch level.



##
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##
@@ -67,6 +70,7 @@ public abstract class HoodieReaderContext {
* file.
*
* @param filePath   {@link Path} instance of a file.
+   * @param isLogFile  Whether this is a log file.
* @param start  Starting byte to start reading.

Review Comment:
   Can we infer the `isLogFile` flag directly from the file extension?



##
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##
@@ -94,17 +94,18 @@ public Comparable getOrderingValue(Option 
rowOption,
 
   @Override
   public HoodieRecord constructHoodieRecord(Option 
rowOption,
- Map 
metadataMap,
- Schema schema) {
+ Map 
metadataMap) {
 if (!rowOption.isPresent()) {
   return new HoodieEmptyRecord<>(
   new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
   (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
   HoodieRecord.HoodieRecordType.SPARK);
 }
 
+Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
 InternalRow row = rowOption.get();
-return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));
+boolean isPartial = (boolean) 
metadataMap.getOrDefault(INTERNAL_META_IS_PARTIAL, false);
+return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema), isPartial);

Review Comment:
   Can you explain in high level why we need to know a record comes from 
partial update or not? Isn't the partial update attribute of commit?



##
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##
@@ -126,12 +128,13 @@ protected Option doProcessNextDataRecord(T record,
   // Merge and store the combined record
   // Note that the incoming `record` is from an older commit, so it should 
be put as
   // the `older` in the merge API
+
   HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(
-  readerContext.constructHoodieRecord(Option.of(record), metadata, 
readerSchema),
-  readerSchema,
+  readerContext.constructHoodieRecord(Option.of(record), metadata),
+  (Schema) metadata.get(INTERNAL_META_SCHEMA),
   readerContext.constructHoodieRecord(
-  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema),
-  readerSchema,
+  existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
+  (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
   payloadProps).get().getLeft();

Review Comment:
   Why not just initialize the `readerSchema` with `(Schema) 
metadata.get(INTERNAL_META_SCHEMA)` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1774544385

   
   ## CI report:
   
   * c140ff462f58b649d45c782ce072b683cd908c1c Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20441)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-23 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1774534304

   
   ## CI report:
   
   * 8104637c318a0a12b646c7819a3ba6270d90 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20439)
 
   * c140ff462f58b649d45c782ce072b683cd908c1c UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-22 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1774432431

   
   ## CI report:
   
   * 8104637c318a0a12b646c7819a3ba6270d90 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20439)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-22 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1774426918

   
   ## CI report:
   
   * 8cec737f1e3a05a91c475d395cb08e74dd668c73 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20436)
 
   * 8104637c318a0a12b646c7819a3ba6270d90 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-22 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1774343094

   
   ## CI report:
   
   * 8cec737f1e3a05a91c475d395cb08e74dd668c73 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20436)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-22 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1774261385

   
   ## CI report:
   
   * d1a976ddc81d1aa79df06fcbf72acd59c2c2b518 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20433)
 
   * 8cec737f1e3a05a91c475d395cb08e74dd668c73 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20436)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-22 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1774256792

   
   ## CI report:
   
   * d1a976ddc81d1aa79df06fcbf72acd59c2c2b518 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20433)
 
   * 8cec737f1e3a05a91c475d395cb08e74dd668c73 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-21 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1773965031

   
   ## CI report:
   
   * d1a976ddc81d1aa79df06fcbf72acd59c2c2b518 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20433)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-21 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1773948280

   
   ## CI report:
   
   * d1a976ddc81d1aa79df06fcbf72acd59c2c2b518 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=20433)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6801] Implement merging partial updates from log files for MOR tables [hudi]

2023-10-21 Thread via GitHub


hudi-bot commented on PR #9883:
URL: https://github.com/apache/hudi/pull/9883#issuecomment-1773947290

   
   ## CI report:
   
   * d1a976ddc81d1aa79df06fcbf72acd59c2c2b518 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org