[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-08-01 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463979191



##
File path: 
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##
@@ -267,15 +268,15 @@ private boolean 
deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre
 return success;
   }
 
-  public void archive(List instants) throws 
HoodieCommitException {
+  public void archive(JavaSparkContext jsc, List instants) 
throws HoodieCommitException {
 try {
   HoodieTimeline commitTimeline = 
metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
   Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
   LOG.info("Wrapper schema " + wrapperSchema.toString());
   List records = new ArrayList<>();
   for (HoodieInstant hoodieInstant : instants) {
 try {
-  deleteAnyLeftOverMarkerFiles(hoodieInstant);
+  deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);

Review comment:
   sorry, why making these changes in this PR ? This PR is meant for delete 
marker field. are these changes related to user defined delete marker field ? 

##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
 record1.put("id", "1");
 record1.put("partition", "partition0");
 record1.put("ts", 0L);
-record1.put("_hoodie_is_deleted", false);
+record1.put(defaultDeleteMarkerField, false);

Review comment:
   actually we could test this way. not sure if you already do that. 
   set default marker field value to true and user defined to false. If 
OverwriteWithLatestAvro is instantiated w/o any marker fields, the record 
should be deleted. If OverwriteWithLatestAvro is instantiated w/ user defined 
marker field, the record should be considered active. Vice versa as well. All 
tests in this class could be done this way to ensure that the other column is 
treated as yet another user's data column and hoodie does not care about it. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-31 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r463548033



##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##
@@ -177,6 +178,18 @@ public RawTripTestPayload 
generateRandomValueAsPerSchema(String schemaStr, Hoodi
 return null;
   }
 
+  public static List genericRecords(int n, boolean 
isDeleteRecord, int instantTime) {

Review comment:
   generateGenericRecords.

##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##
@@ -37,14 +37,17 @@
 public class TestOverwriteWithLatestAvroPayload {
 
   private Schema schema;
+  String defaultDeleteField = "_hoodie_is_deleted";
+  String deleteField = "delete_field";

Review comment:
   deleteMarkerField

##
File path: 
hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
##
@@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException {
 record1.put("id", "1");
 record1.put("partition", "partition0");
 record1.put("ts", 0L);
-record1.put("_hoodie_is_deleted", false);
+record1.put(defaultDeleteField, false);
+record1.put(deleteField, true);

Review comment:
   shouldn't this also be false.

##
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##
@@ -215,11 +216,20 @@ public static HoodieDateTimeParser 
createDateTimeParser(TypedProperties props, S
   /**
* Create a payload class via reflection, passing in an ordering/precombine 
value.
*/
-  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record, Comparable orderingVal)
-  throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record,
+  Comparable orderingVal,
+  String deleteField) throws 
IOException {

Review comment:
   deleteMarkerField

##
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends 
UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+UtilitiesTestBase.initClass(true);
+PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+// prepare the configs.
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
+dfsBasePath + "/sql-transformer.properties");
+UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
 dfs, dfsBasePath + "/source-flattened.avsc");
+UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
+  }
+
+  @Test
+  public void testOverwriteLatestAvroPayload() throws Exception {
+// test defaultDeleteField
+this.testOverwriteLatestAvroPayload(null);
+
+// test userDefinedDeleteField
+ 

[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-26 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r460415342



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -36,6 +36,8 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 implements HoodieRecordPayload {
 
+  private String deletedField = "_hoodie_is_deleted";

Review comment:
   can we name something like "deleteMarkerField" or something. Feel 
"deletedField" conveys the field is deleted.

##
File path: 
hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
##
@@ -100,6 +100,53 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
 }
   }
 
+  test("test OverwriteWithLatestAvroPayload with user defined delete field") {
+val session = SparkSession.builder()
+  .appName("test_append_mode")
+  .master("local[2]")
+  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  .getOrCreate()
+val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
+
+try {
+  val sqlContext = session.sqlContext
+  val hoodieFooTableName = "hoodie_foo_tbl"
+
+  val keyField = "id"
+  val deleteField = "delete_field"
+
+  //create a new table
+  val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+"hoodie.insert.shuffle.parallelism" -> "2",
+"hoodie.upsert.shuffle.parallelism" -> "2",
+DELETE_FIELD_OPT_KEY -> deleteField,
+RECORDKEY_FIELD_OPT_KEY -> keyField)
+  val fooTableParams = 
HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+
+  val dataFrame = session.createDataFrame(Seq(
+(12, "ming", 20.23, "2018-01-01T13:51:39.340396Z", false),
+(34, "zhi", 21.323, "2018-01-01T13:52:39.340396Z", false)
+  )) toDF(keyField, "name", "weight", "ts", deleteField)
+
+  HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, 
dataFrame)
+  val recordCount1 = 
sqlContext.read.format("org.apache.hudi").load(path.toString + 
"/*/*.parquet").count
+  assert(recordCount1 == 2, "result should be 2, but get " + recordCount1)
+
+  val dataFrame2 = session.createDataFrame(Seq(
+(12, "ming", 20.23, "2018-01-01T13:53:39.340396Z", true),
+(34, "zhi", 30.3, "2018-01-01T13:54:39.340396Z", false)
+  )) toDF(keyField, "name", "weight", "ts", deleteField)
+  HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, 
dataFrame2)
+
+  val recordCount = 
sqlContext.read.format("org.apache.hudi").load(path.toString + 
"/*/*.parquet").count
+  assert(recordCount == 1, "result should be 1, but get " + recordCount)

Review comment:
   can we also compare the record actually matches the active one and not 
the deleted one. 

##
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends 
UtilitiesTestBase {
+  private static String PARQUET_SOURCE_ROOT;
+  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+

[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-20 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r457479621



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -337,9 +337,15 @@ private void refreshTimeline() throws IOException {
 }
 
 JavaRDD avroRDD = avroRDDOptional.get();
+if (writeClient == null) {
+  this.schemaProvider = schemaProvider;
+  setupWriteClient();

Review comment:
   also thinking do we really need to instantiate the config. since it is 
just one property, can't we directly read if from TypedProperties? @bvaradar : 
do you have any thoughts on this. Basically we need to read just one config 
value for deleteMarker from the properties set. This step is little ahead of 
where we instantiate writeClient, so wondering how to go about it. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-20 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r457472660



##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##
@@ -337,9 +337,15 @@ private void refreshTimeline() throws IOException {
 }
 
 JavaRDD avroRDD = avroRDDOptional.get();
+if (writeClient == null) {
+  this.schemaProvider = schemaProvider;
+  setupWriteClient();

Review comment:
   all we need is a config here. don't think we need to initialize 
writeClient here.

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -66,8 +74,9 @@ public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload
 }
 
 GenericRecord genericRecord = (GenericRecord) recordOption.get();
-// combining strategy here trivially ignores currentValue on disk and 
writes this record
-Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+// combining strategy here trivially ignores currentValue on disk and 
writes this record吗
+String deleteField = isDeletedField == null ? "_hoodie_is_deleted" : 
isDeletedField;

Review comment:
   sorry, I didn't realize the other constructor. We could then initialize 
isDeletedField = "_hoodie_is_deleted"; So that one of the constructors will 
over-ride the value. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-17 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r456459962



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 implements HoodieRecordPayload {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";

Review comment:
   sorry, don't understand why we need this. the config value will either 
refer to "_hoodie_is_deleted" if not set, or will refer to the user defined 
prop. So, we could initialize isDeletedField = null and set it in constructor. 
correct me if my understanding is wrong. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-16 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455816566



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 implements HoodieRecordPayload {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";
+  private String userDefinedDeleteField = DEFAULT_DELETE_FIELD;

Review comment:
   no, I meant it as field name referring to column. Thats why suffixed 
with "field"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-16 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455700501



##
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##
@@ -184,6 +184,13 @@ object DataSourceWriteOptions {
   val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
   val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName
 
+  /**
+   * Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, 
When two records have the same
+   * key value, we will check if the new record is deleted by the delete field.
+   */
+  val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field"
+  val DEFAULT_DELETE_FIELD_OPT_VAL = 
OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD

Review comment:
   you might as well define the default var in this class rather than 
OverwriteWithLatestAvroPayload. 

##
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##
@@ -202,6 +202,10 @@ public Operation convert(String value) throws 
ParameterException {
 + " to break ties between records with same key in input data. 
Default: 'ts' holding unix timestamp of record")
 public String sourceOrderingField = "ts";
 
+@Parameter(names = {"--source-delete-field"}, description = "Field within 
source record to decide"
++ " is this record is deleted. Default: " + 
OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD)

Review comment:
   this leakage is what I wish to avoid. Why would these classes access 
OverwriteWithLatestAvroPayload. moving it to DatasourceUtils or some config 
classes makes sense. 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##
@@ -36,6 +36,9 @@
 public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 implements HoodieRecordPayload {
 
+  public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted";
+  private String userDefinedDeleteField = DEFAULT_DELETE_FIELD;

Review comment:
   can we name it as "isDeletedField" or "isDeletedMarkerField". may or may 
not be user defined and hence. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-15 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455440158



##
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##
@@ -176,11 +177,21 @@ public static KeyGenerator 
createKeyGenerator(TypedProperties props) throws IOEx
   /**
* Create a payload class via reflection, passing in an ordering/precombine 
value.
*/
-  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record, Comparable orderingVal)
-  throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record,
+  Comparable orderingVal,
+  String deleteField) throws 
IOException {
 try {
-  return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-  new Class[] {GenericRecord.class, Comparable.class}, record, 
orderingVal);
+  HoodieRecordPayload payload = null;
+  if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) 
&&
+  !deleteField.isEmpty()) {

Review comment:
   we can simplify this(I haven't gone through the entire patch yet). If 
you set default value for this config param to "_hoodie_is_deleted", then you 
don't need to check for isEmpty. also within OverwriteWithLatestAvroPayload, 
you could just take in the deleteKeyField as is. either it will point to 
"_hoodie_is_deleted" if not set. Else it will point to the one user has 
over-written. 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable

2020-07-15 Thread GitBox


nsivabalan commented on a change in pull request #1819:
URL: https://github.com/apache/hudi/pull/1819#discussion_r455440158



##
File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
##
@@ -176,11 +177,21 @@ public static KeyGenerator 
createKeyGenerator(TypedProperties props) throws IOEx
   /**
* Create a payload class via reflection, passing in an ordering/precombine 
value.
*/
-  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record, Comparable orderingVal)
-  throws IOException {
+  public static HoodieRecordPayload createPayload(String payloadClass, 
GenericRecord record,
+  Comparable orderingVal,
+  String deleteField) throws 
IOException {
 try {
-  return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-  new Class[] {GenericRecord.class, Comparable.class}, record, 
orderingVal);
+  HoodieRecordPayload payload = null;
+  if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) 
&&
+  !deleteField.isEmpty()) {

Review comment:
   we can simplify this(I haven't gone through the entire patch yet). If 
you set default value for this config param to "_hoodie_is_deleted". then you 
don't need to check for isEmpty. also within OverwriteWithLatestAvroPayload, 
you could just take in the deleteKeyField as is. either it will point to 
"_hoodie_is_deleted" is not set. Else it will point to the one user has 
over-written. 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org