nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463548033



##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -177,6 +178,18 @@ public RawTripTestPayload 
generateRandomValueAsPerSchema(String schemaStr, Hoodi
     return null;
   }
 
+  public static List<GenericRecord> genericRecords(int n, boolean 
isDeleteRecord, int instantTime) {

Review comment:
       generateGenericRecords.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -37,14 +37,17 @@
 public class TestOverwriteWithLatestAvroPayload {
 
   private Schema schema;
+  String defaultDeleteField = "_hoodie_is_deleted";
+  String deleteField = "delete_field";

Review comment:
       deleteMarkerField

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);
+    record1.put(deleteField, true);

Review comment:
       shouldn't this also be false.

##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -215,11 +216,20 @@ public static HoodieDateTimeParser 
createDateTimeParser(TypedProperties props, S
   /**
    * Create a payload class via reflection, passing in an ordering/precombine 
value.
    */
-  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record, Comparable orderingVal)
-      throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record,
+                                                  Comparable orderingVal,
+                                                  String deleteField) throws 
IOException {

Review comment:
       deleteMarkerField

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends 
UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
 dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {
+    // test defaultDeleteField
+    this.testOverwriteLatestAvroPayload(null);
+
+    // test userDefinedDeleteField
+    this.testOverwriteLatestAvroPayload("user_defined_delete_field");
+  }
+
+  public void testOverwriteLatestAvroPayload(String deleteMarkerField) throws 
Exception {

Review comment:
       private

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);
+    record1.put(deleteField, true);
 
     GenericRecord record2 = new GenericData.Record(schema);
     record2.put("id", "2");
     record2.put("partition", "partition1");
     record2.put("ts", 1L);
-    record2.put("_hoodie_is_deleted", false);
+    record2.put(defaultDeleteField, false);
+    record2.put(deleteField, true);

Review comment:
       same here.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -251,6 +253,10 @@ public int getMaxConsistencyCheckIntervalMs() {
     return 
Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
+  public String getDeleteField() {

Review comment:
       lets be uniform throughout. getDeleteMarkerField. 

##########
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -275,8 +285,9 @@ public static HoodieWriteClient 
createHoodieClient(JavaSparkContext jssc, String
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable 
orderingVal, HoodieKey hKey,
-                                                String payloadClass) throws 
IOException {
-    HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, 
gr, orderingVal);
+                                                String payloadClass,
+                                                String deleteField) throws 
IOException {

Review comment:
       deleteMarkerField

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -80,13 +85,16 @@ public void testDeletedRecord() throws IOException {
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
+    record1.put(defaultDeleteField, false);

Review comment:
       can we also fix lines 70 and 71. While creating the 
OverwriteWithLatestAvroPayload, pass in the delete field marker. Again, can we 
test for both default and user defined. So, since these are active field, 
existing assertions should work. 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -337,9 +337,11 @@ private void refreshTimeline() throws IOException {
     }
 
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+    String deleteField = props.getString(HoodieWriteConfig.DELETE_FIELD_PROP, 
HoodieWriteConfig.DEFAULT_DELETE_FIELD);

Review comment:
       deleteMarkerField

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends 
UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+    // prepare the configs.
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
 dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {
+    // test defaultDeleteField
+    this.testOverwriteLatestAvroPayload(null);
+
+    // test userDefinedDeleteField
+    this.testOverwriteLatestAvroPayload("user_defined_delete_field");
+  }
+
+  public void testOverwriteLatestAvroPayload(String deleteMarkerField) throws 
Exception {
+    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+    List<GenericRecord> records = HoodieTestDataGenerator.genericRecords(5, 
false, 0);
+    Helpers.saveParquetToDFS(records, new Path(path));
+
+    TypedProperties parquetProps = new TypedProperties();
+    parquetProps.setProperty("include", "base.properties");
+    parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
PARQUET_SOURCE_ROOT);
+    if (deleteMarkerField != null) {
+      parquetProps.setProperty(HoodieWriteConfig.DELETE_FIELD_PROP, 
deleteMarkerField);
+    }
+    Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_PARQUET);
+
+    String tableBasePath = dfsBasePath + 
"/test_overwrite_lastest_avro_payload_table";
+
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, 
HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(),
+            null, PROPS_FILENAME_TEST_PARQUET, false,
+            false, 100000, false, null, null, "timestamp"), jsc);
+    deltaStreamer.sync();
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + 
"/*/*.parquet", sqlContext);
+
+    String path2 = PARQUET_SOURCE_ROOT + "/2.parquet";
+    List<GenericRecord> records2 = HoodieTestDataGenerator.genericRecords(2, 
true, 1);
+    Helpers.saveParquetToDFS(records2, new Path(path2));
+    deltaStreamer.sync();
+
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(3, tableBasePath + 
"/*/*.parquet", sqlContext);

Review comment:
       is it possible to verify the records for equality ? 

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##########
@@ -98,6 +106,18 @@ public void testDeletedRecord() throws IOException {
 
     assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), 
record1);
     assertFalse(payload2.combineAndGetUpdateValue(record1, 
schema).isPresent());
+
+    // test userDefinedDeleteField

Review comment:
       we could move this to a private method and so lines 99 tp 108 and 110 to 
120 can re-use code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to