nsivabalan commented on code in PR #12122:
URL: https://github.com/apache/hudi/pull/12122#discussion_r1807047422
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java:
##########
@@ -166,6 +167,12 @@ private List<HoodieRecord<T>>
doMergedRead(Option<HoodieFileReader> baseFileRead
String key = record.getRecordKey();
if (deltaRecordMap.containsKey(key)) {
deltaRecordKeys.remove(key);
+ // When internal operation exists, it means there are at least one
delete in between.
+ // Therefore, no need to merge with the base record.
Review Comment:
do you think we can check if value is equal to "Delete" operation.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -99,7 +101,14 @@ protected <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOExcepti
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
// it since these records will be put into records(Map).
- records.put(key, latestHoodieRecord.copy());
+ HoodieRecord finalRecord = latestHoodieRecord.copy();
+
+ // Reserve the delete information.
+ if (prevRecord.isDelete(readerSchema, this.getPayloadProps())
Review Comment:
should we try to add some utils methods and use it across the board. for eg,
to account for both payloadProps and tableConfig.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -99,7 +101,14 @@ protected <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOExcepti
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
// it since these records will be put into records(Map).
- records.put(key, latestHoodieRecord.copy());
+ HoodieRecord finalRecord = latestHoodieRecord.copy();
+
+ // Reserve the delete information.
+ if (prevRecord.isDelete(readerSchema, this.getPayloadProps())
Review Comment:
I see BaseHMergedLogRecordScanner L98ish, we account for payload props, and
tableConfig.getPreombineKey.
why we are not accounting for tableConfig here.
If we are sure payload props is good enough?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -278,14 +289,19 @@ protected Option<DeleteRecord>
doProcessNextDeletedRecord(DeleteRecord deleteRec
if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingVal)) {
return Option.empty();
}
- Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
- // Checks the ordering value does not equal to 0
- // because we use 0 as the default value which means natural order
- boolean chooseExisting = !deleteOrderingVal.equals(0)
- && ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal)
- && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
+ Comparable deleteOrderingVal = readerContext.getOrderingValue(
Review Comment:
can we define a boolean value named "orderingValueMissing" and so code
readabiltiy is better.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -278,14 +289,19 @@ protected Option<DeleteRecord>
doProcessNextDeletedRecord(DeleteRecord deleteRec
if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingVal)) {
return Option.empty();
}
- Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
- // Checks the ordering value does not equal to 0
- // because we use 0 as the default value which means natural order
- boolean chooseExisting = !deleteOrderingVal.equals(0)
- && ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal)
- && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
+ Comparable deleteOrderingVal = readerContext.getOrderingValue(
+ Option.empty(), Collections.emptyMap(), readerSchema,
orderingFieldName, orderingFieldType, orderingFieldDefault);
+ deleteOrderingVal = deleteRecord.getOrderingValue() == null ?
deleteOrderingVal : deleteRecord.getOrderingValue();
Review Comment:
for now, if we are going to take a stand that delete will just override any
previous value, why can't we just set the default ordering value
(readerContext.castValue(0, orderingFieldType)) as the ordering value for
delete records. and it make the code simple and easy to read.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestDeleteRecordLogic.scala:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestDeleteRecordLogic extends SparkClientFunctionalTestHarness{
+ val expected = Seq(
+ (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+ (-9, "4", "rider-DDDD", "driver-DDDD", 20.0, 1),
+ (10, "3", "rider-C", "driver-C", 33.9, 10),
+ (10, "2", "rider-B", "driver-B", 27.7, 1))
+
+ @ParameterizedTest
+ @MethodSource(Array("provideParams"))
+ def testDeleteLogic(useFgReader: String, tableType: String, recordType:
String): Unit = {
+ val sparkOpts: Map[String, String] = Map(
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[DefaultSparkRecordMerger].getName)
+ val fgReaderOpts: Map[String, String] = Map(
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader)
+
+ val opts = if (recordType.equals("SPARK")) sparkOpts ++ fgReaderOpts else
fgReaderOpts
+ val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
+
+ val data = Seq(
+ (10, "1", "rider-A", "driver-A", 19.10, 7),
+ (10, "2", "rider-B", "driver-B", 27.70, 1),
+ (10, "3", "rider-C", "driver-C", 33.90, 10),
+ (-1, "4", "rider-D", "driver-D", 34.15, 6),
+ (10, "5", "rider-E", "driver-E", 17.85, 10))
+ val inserts = spark.createDataFrame(data).toDF(columns: _*)
+ inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ options(opts).
+ mode(SaveMode.Overwrite).
+ save(basePath)
+
+ val updateData = Seq(
+ (11, "1", "rider-X", "driver-X", 19.10, 9),
+ (9, "2", "rider-Y", "driver-Y", 27.70, 7))
+ val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+ updates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "upsert").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+ val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+ deletes.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "delete").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val secondUpdateData = Seq(
+ (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+ (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
+ val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns:
_*)
+ secondUpdates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "upsert").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val secondDeletesData = Seq(
+ (10, "4", "rider-D", "driver-D", 34.15, 6),
+ (0, "1", "rider-X", "driver-X", 19.10, 8))
+ val secondDeletes = spark.createDataFrame(secondDeletesData).toDF(columns:
_*)
+ secondDeletes.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "delete").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val thirdUpdateData = Seq((-8, "4", "rider-DDD", "driver-DDD", 20.00, 1))
+ val thirdUpdates = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+ thirdUpdates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "upsert").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val thirdDeletesData = Seq(
+ (10, "4", "rider-D4", "driver-D4", 34.15, 6),
+ (0, "1", "rider-X", "driver-X", 19.10, 8))
+ val thirdDeletes = spark.createDataFrame(thirdDeletesData).toDF(columns:
_*)
+ thirdDeletes.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "delete").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val fourUpdateData = Seq((-9, "4", "rider-DDDD", "driver-DDDD", 20.00, 1))
+ val fourUpdates = spark.createDataFrame(fourUpdateData).toDF(columns: _*)
+ fourUpdates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "upsert").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ // Read data to compare.
+ val df = spark.read.format("hudi").options(opts).load(basePath)
+ val finalDf = df.select("ts", "key", "rider", "driver", "fare",
"number").sort("ts")
+ finalDf.show(false)
+
+ val expectedDf = spark.createDataFrame(expected).toDF(columns:
_*).sort("ts")
+ val expectedMinusActual = expectedDf.except(finalDf)
+ val actualMinusExpected = finalDf.except(expectedDf)
+
+ expectedMinusActual.show(false)
+ actualMinusExpected.show(false)
Review Comment:
for MOR, can we trigger one final compaction and ensure the expected value
stays intact.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -302,33 +300,49 @@ class RecordMergingFileIterator(logFiles:
List[HoodieLogFile],
private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]):
Option[InternalRow] = {
// NOTE: We have to pass in Avro Schema used to read from Delta Log file
since we invoke combining API
// on the record from the Delta Log
- recordMerger.getRecordType match {
- case HoodieRecordType.SPARK =>
- val curRecord = new HoodieSparkRecord(curRow, readerSchema)
- val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema,
newRecord, logFileReaderAvroSchema, payloadProps)
- toScalaOption(result)
- .flatMap { r =>
- val data = r.getLeft.getData.asInstanceOf[InternalRow]
- if (isDeleteOperation(data)) {
- None
- } else {
- val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight)
- val projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema)
- Some(projection.apply(data))
+
+ // Delete records are in-between; no merge is needed.
+ if
(newRecord.getMetaDataInfo(HoodieReaderContext.INTERNAL_META_OPERATION).isPresent)
{
Review Comment:
lets add an example and say why we need this processing.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -99,7 +101,14 @@ protected <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOExcepti
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer
we get a clean copy of
// it since these records will be put into records(Map).
- records.put(key, latestHoodieRecord.copy());
+ HoodieRecord finalRecord = latestHoodieRecord.copy();
+
+ // Reserve the delete information.
+ if (prevRecord.isDelete(readerSchema, this.getPayloadProps())
Review Comment:
bcoz, we could be using 1.x hudi binary to read a 0.10.x table as well. So,
payload props may not be present.
btw, can we check if payload props are set while using it as a reader?
bcoz, on the reader, only info we have is the table config. So, I would
expect we read the table config and set these payload props.
If not, payload props might be empty even while reading a 1.x table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]