[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-08-01 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r464012445



##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
 record1.put("id", "1");
 record1.put("partition", "partition0");
 record1.put("ts", 0L);
-record1.put("_hoodie_is_deleted", false);
+record1.put(defaultDeleteMarkerField, false);

Review comment:
   I will fix the comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-08-01 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r464010724



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -267,15 +268,15 @@ private boolean 
deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre
 return success;
   }
 
-  public void archive(List instants) throws 
HoodieCommitException {
+  public void archive(JavaSparkContext jsc, List instants) 
throws HoodieCommitException {
 try {
   HoodieTimeline commitTimeline = 
metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
   Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
   LOG.info("Wrapper schema " + wrapperSchema.toString());
   List records = new ArrayList<>();
   for (HoodieInstant hoodieInstant : instants) {
 try {
-  deleteAnyLeftOverMarkerFiles(hoodieInstant);
+  deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);

Review comment:
   sorry, It does not belong to this pull request, I will remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-08-01 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463940587



##
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends 
UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+UtilitiesTestBase.initClass(true);
+PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+// prepare the configs.
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
+dfsBasePath + "/sql-transformer.properties");
+UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
 dfs, dfsBasePath + "/source-flattened.avsc");
+UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {
+// test defaultDeleteField
+this.testOverwriteLatestAvroPayload(null);
+
+// test userDefinedDeleteField
+this.testOverwriteLatestAvroPayload("user_defined_delete_field");
+  }
+
+  public void testOverwriteLatestAvroPayload(String deleteMarkerField) throws 
Exception {
+String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+List records = HoodieTestDataGenerator.genericRecords(5, 
false, 0);
+Helpers.saveParquetToDFS(records, new Path(path));
+
+TypedProperties parquetProps = new TypedProperties();
+parquetProps.setProperty("include", "base.properties");
+parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
PARQUET_SOURCE_ROOT);
+if (deleteMarkerField != null) {
+  parquetProps.setProperty(HoodieWriteConfig.DELETE_FIELD_PROP, 
deleteMarkerField);
+}
+Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_PARQUET);
+
+String tableBasePath = dfsBasePath + 
"/test_overwrite_lastest_avro_payload_table";
+
+HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, 
HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(),
+null, PROPS_FILENAME_TEST_PARQUET, false,
+false, 10, false, null, null, "timestamp"), jsc);
+deltaStreamer.sync();
+TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + 
"/*/*.parquet", sqlContext);
+
+String path2 = PARQUET_SOURCE_ROOT + "/2.parquet";
+List records2 = HoodieTestDataGenerator.genericRecords(2, 
true, 1);
+Helpers.saveParquetToDFS(records2, new Path(path2));
+deltaStreamer.sync();
+
+TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(3, tableBasePath + 
"/*/*.parquet", sqlContext);

Review comment:
   Since the returned Spark Row is not easy to compare with GenericRecord, 
it is easier for us to comp

[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-08-01 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463936014



##
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -251,6 +253,10 @@ public int getMaxConsistencyCheckIntervalMs() {
 return 
Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
+  public String getDeleteField() {

Review comment:
   Thanks for your comments, I will fix it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-08-01 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463935979



##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
 record1.put("id", "1");
 record1.put("partition", "partition0");
 record1.put("ts", 0L);
-record1.put("_hoodie_is_deleted", false);
+record1.put(defaultDeleteField, false);
+record1.put(deleteField, true);
 
 GenericRecord record2 = new GenericData.Record(schema);
 record2.put("id", "2");
 record2.put("partition", "partition1");
 record2.put("ts", 1L);
-record2.put("_hoodie_is_deleted", false);
+record2.put(defaultDeleteField, false);
+record2.put(deleteField, true);

Review comment:
   The same as above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-08-01 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463935932



##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
 record1.put("id", "1");
 record1.put("partition", "partition0");
 record1.put("ts", 0L);
-record1.put("_hoodie_is_deleted", false);
+record1.put(defaultDeleteField, false);
+record1.put(deleteField, true);

Review comment:
   In fact, it does not matter whether deleteField is true or false here, 
only defaultDeleteField will affect decide the result.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-29 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r461266911



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -36,6 +36,8 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 implements HoodieRecordPayload {
 
+  private String deletedField = "_hoodie_is_deleted";

Review comment:
   Thanks for your comments, I will fix the comments.

##
File path: 
hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##
@@ -100,6 +100,53 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
 }
   }
 
+  test("test OverwriteWithLatestAvroPayload with user defined delete field") {
+val session = SparkSession.builder()
+  .appName("test_append_mode")
+  .master("local[2]")
+  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  .getOrCreate()
+val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
+
+try {
+  val sqlContext = session.sqlContext
+  val hoodieFooTableName = "hoodie_foo_tbl"
+
+  val keyField = "id"
+  val deleteField = "delete_field"
+
+  //create a new table
+  val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+"hoodie.insert.shuffle.parallelism" -> "2",
+"hoodie.upsert.shuffle.parallelism" -> "2",
+DELETE_FIELD_OPT_KEY -> deleteField,
+RECORDKEY_FIELD_OPT_KEY -> keyField)
+  val fooTableParams = 
HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+
+  val dataFrame = session.createDataFrame(Seq(
+(12, "ming", 20.23, "2018-01-01T13:51:39.340396Z", false),

Review comment:
   Here the records is no need to scale, I will change the Seq to only 
contains 1 element.

##
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends 
UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+UtilitiesTestBase.initClass(true);
+PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+// prepare the configs.
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
+dfsBasePath + "/sql-transformer.properties");
+UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
 dfs, dfsBasePath + "/source-flattened.avsc");
+UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
+  }
+
+  private static List genericRecords(int n, boolean 
isDeleteRecord, int instantTime) {
+return IntStream.range(0, n).boxed().map(i -> {
+  String partitionPath = "partitionPath1";
+  HoodieKey key = new HoodieKey("id_" + i

[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-21 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r458181162



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -66,8 +74,9 @@ public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload
 }
 
 GenericRecord genericRecord = (GenericRecord) recordOption.get();
-// combining strategy here trivially ignores currentValue on disk and 
writes this record
-Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+// combining strategy here trivially ignores currentValue on disk and 
writes this record吗
+String deleteField = isDeletedField == null ? "_hoodie_is_deleted" : 
isDeletedField;

Review comment:
   > I don't see any tests being added as part of the patch. Would be nice 
to have some tests covering the new code that was added at all levels.
   > 
   > * WriteClient
   > * Datasource if there is an existing suite of tests for other write 
operations
   > * Deltastreamer
   
   Sorry for late, I will add some testcases and fix the comments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-21 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r458181162



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -66,8 +74,9 @@ public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload
 }
 
 GenericRecord genericRecord = (GenericRecord) recordOption.get();
-// combining strategy here trivially ignores currentValue on disk and 
writes this record
-Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+// combining strategy here trivially ignores currentValue on disk and 
writes this record吗
+String deleteField = isDeletedField == null ? "_hoodie_is_deleted" : 
isDeletedField;

Review comment:
   > I don't see any tests being added as part of the patch. Would be nice 
to have some tests covering the new code that was added at all levels.
   > 
   > * WriteClient
   > * Datasource if there is an existing suite of tests for other write 
operations
   > * Deltastreamer
   
   Sorry for late, I will add some testcase and fix the comments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-18 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r456771979



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 implements HoodieRecordPayload {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";

Review comment:
   Remove DEFAULT_DELETE_FIELD.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-16 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455727810



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 implements HoodieRecordPayload {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";
+  private String userDefinedDeleteField = DEFAULT_DELETE_FIELD;

Review comment:
   If the variable name  change to "isDeletedField" or 
"isDeletedMarkerField", it means the variable type should be boolean. Maybe the 
default value of userDefinedDeleteField change to null.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-16 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455609009



##
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##
@@ -176,11 +177,21 @@ public static KeyGenerator 
createKeyGenerator(TypedProperties props) throws IOEx
   /**
* Create a payload class via reflection, passing in an ordering/precombine 
value.
*/
-  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record, Comparable orderingVal)
-  throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record,
+  Comparable orderingVal,
+  String deleteField) throws 
IOException {
 try {
-  return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-  new Class[] {GenericRecord.class, Comparable.class}, record, 
orderingVal);
+  HoodieRecordPayload payload = null;
+  if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) 
&&
+  !deleteField.isEmpty()) {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] shenh062326 commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-12 Thread GitBox


shenh062326 commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r453293178



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -67,7 +74,8 @@ public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload
 
 GenericRecord genericRecord = (GenericRecord) recordOption.get();
 // combining strategy here trivially ignores currentValue on disk and 
writes this record
-Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+String deleteField = userDefineDeleteField == null ? DEFAULT_DELETE_FIELD 
: userDefineDeleteField;
+Object deleteMarker = genericRecord.get(deleteField);

Review comment:
   It sounds good. But do we need to add the new method to the interface 
HoodieRecordPayload? If so, does the classes that  has implemented need to 
implement this method?
   
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org