[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r463979191 ## File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ## @@ -267,15 +268,15 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre return success; } - public void archive(List instants) throws HoodieCommitException { + public void archive(JavaSparkContext jsc, List instants) throws HoodieCommitException { try { HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); LOG.info("Wrapper schema " + wrapperSchema.toString()); List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { - deleteAnyLeftOverMarkerFiles(hoodieInstant); + deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant); Review comment: sorry, why making these changes in this PR ? This PR is meant for delete marker field. are these changes related to user defined delete marker field ? ## File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java ## @@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException { record1.put("id", "1"); record1.put("partition", "partition0"); record1.put("ts", 0L); -record1.put("_hoodie_is_deleted", false); +record1.put(defaultDeleteMarkerField, false); Review comment: actually we could test this way. not sure if you already do that. set default marker field value to true and user defined to false. If OverwriteWithLatestAvro is instantiated w/o any marker fields, the record should be deleted. If OverwriteWithLatestAvro is instantiated w/ user defined marker field, the record should be considered active. Vice versa as well. All tests in this class could be done this way to ensure that the other column is treated as yet another user's data column and hoodie does not care about it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r463548033 ## File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java ## @@ -177,6 +178,18 @@ public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, Hoodi return null; } + public static List genericRecords(int n, boolean isDeleteRecord, int instantTime) { Review comment: generateGenericRecords. ## File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java ## @@ -37,14 +37,17 @@ public class TestOverwriteWithLatestAvroPayload { private Schema schema; + String defaultDeleteField = "_hoodie_is_deleted"; + String deleteField = "delete_field"; Review comment: deleteMarkerField ## File path: hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java ## @@ -54,13 +57,15 @@ public void testActiveRecords() throws IOException { record1.put("id", "1"); record1.put("partition", "partition0"); record1.put("ts", 0L); -record1.put("_hoodie_is_deleted", false); +record1.put(defaultDeleteField, false); +record1.put(deleteField, true); Review comment: shouldn't this also be false. ## File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java ## @@ -215,11 +216,20 @@ public static HoodieDateTimeParser createDateTimeParser(TypedProperties props, S /** * Create a payload class via reflection, passing in an ordering/precombine value. */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) - throws IOException { + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, + Comparable orderingVal, + String deleteField) throws IOException { Review comment: deleteMarkerField ## File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.sources.ParquetDFSSource; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase { + private static String PARQUET_SOURCE_ROOT; + private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + + @BeforeAll + public static void initClass() throws Exception { +UtilitiesTestBase.initClass(true); +PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + +// prepare the configs. + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, +dfsBasePath + "/sql-transformer.properties"); +UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); +UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); + } + + @Test + public void testOverwriteLatestAvroPayload() throws Exception { +// test defaultDeleteField +this.testOverwriteLatestAvroPayload(null); + +// test userDefinedDeleteField +
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r460415342 ## File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java ## @@ -36,6 +36,8 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { + private String deletedField = "_hoodie_is_deleted"; Review comment: can we name something like "deleteMarkerField" or something. Feel "deletedField" conveys the field is deleted. ## File path: hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala ## @@ -100,6 +100,53 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } + test("test OverwriteWithLatestAvroPayload with user defined delete field") { +val session = SparkSession.builder() + .appName("test_append_mode") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() +val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1") + +try { + val sqlContext = session.sqlContext + val hoodieFooTableName = "hoodie_foo_tbl" + + val keyField = "id" + val deleteField = "delete_field" + + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, +HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, +"hoodie.insert.shuffle.parallelism" -> "2", +"hoodie.upsert.shuffle.parallelism" -> "2", +DELETE_FIELD_OPT_KEY -> deleteField, +RECORDKEY_FIELD_OPT_KEY -> keyField) + val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier) + + val dataFrame = session.createDataFrame(Seq( +(12, "ming", 20.23, "2018-01-01T13:51:39.340396Z", false), +(34, "zhi", 21.323, "2018-01-01T13:52:39.340396Z", false) + )) toDF(keyField, "name", "weight", "ts", deleteField) + + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame) + val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count + assert(recordCount1 == 2, "result should be 2, but get " + recordCount1) + + val dataFrame2 = session.createDataFrame(Seq( +(12, "ming", 20.23, "2018-01-01T13:53:39.340396Z", true), +(34, "zhi", 30.3, "2018-01-01T13:54:39.340396Z", false) + )) toDF(keyField, "name", "weight", "ts", deleteField) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2) + + val recordCount = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count + assert(recordCount == 1, "result should be 1, but get " + recordCount) Review comment: can we also compare the record actually matches the active one and not the deleted one. ## File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.HoodieTestDataGenerator; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.sources.ParquetDFSSource; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase { + private static String PARQUET_SOURCE_ROOT; + private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + + @BeforeAll + public static void initClass() throws Exception { +Uti
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r457479621 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -337,9 +337,15 @@ private void refreshTimeline() throws IOException { } JavaRDD avroRDD = avroRDDOptional.get(); +if (writeClient == null) { + this.schemaProvider = schemaProvider; + setupWriteClient(); Review comment: also thinking do we really need to instantiate the config. since it is just one property, can't we directly read if from TypedProperties? @bvaradar : do you have any thoughts on this. Basically we need to read just one config value for deleteMarker from the properties set. This step is little ahead of where we instantiate writeClient, so wondering how to go about it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r457472660 ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java ## @@ -337,9 +337,15 @@ private void refreshTimeline() throws IOException { } JavaRDD avroRDD = avroRDDOptional.get(); +if (writeClient == null) { + this.schemaProvider = schemaProvider; + setupWriteClient(); Review comment: all we need is a config here. don't think we need to initialize writeClient here. ## File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java ## @@ -66,8 +74,9 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload } GenericRecord genericRecord = (GenericRecord) recordOption.get(); -// combining strategy here trivially ignores currentValue on disk and writes this record -Object deleteMarker = genericRecord.get("_hoodie_is_deleted"); +// combining strategy here trivially ignores currentValue on disk and writes this recordå +String deleteField = isDeletedField == null ? "_hoodie_is_deleted" : isDeletedField; Review comment: sorry, I didn't realize the other constructor. We could then initialize isDeletedField = "_hoodie_is_deleted"; So that one of the constructors will over-ride the value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r456459962 ## File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java ## @@ -36,6 +36,9 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { + public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted"; Review comment: sorry, don't understand why we need this. the config value will either refer to "_hoodie_is_deleted" if not set, or will refer to the user defined prop. So, we could initialize isDeletedField = null and set it in constructor. correct me if my understanding is wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r455816566 ## File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java ## @@ -36,6 +36,9 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { + public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted"; + private String userDefinedDeleteField = DEFAULT_DELETE_FIELD; Review comment: no, I meant it as field name referring to column. Thats why suffixed with "field" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r455700501 ## File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala ## @@ -184,6 +184,13 @@ object DataSourceWriteOptions { val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class" val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName + /** + * Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same + * key value, we will check if the new record is deleted by the delete field. + */ + val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field" + val DEFAULT_DELETE_FIELD_OPT_VAL = OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD Review comment: you might as well define the default var in this class rather than OverwriteWithLatestAvroPayload. ## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ## @@ -202,6 +202,10 @@ public Operation convert(String value) throws ParameterException { + " to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record") public String sourceOrderingField = "ts"; +@Parameter(names = {"--source-delete-field"}, description = "Field within source record to decide" ++ " is this record is deleted. Default: " + OverwriteWithLatestAvroPayload.DEFAULT_DELETE_FIELD) Review comment: this leakage is what I wish to avoid. Why would these classes access OverwriteWithLatestAvroPayload. moving it to DatasourceUtils or some config classes makes sense. ## File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java ## @@ -36,6 +36,9 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { + public static final String DEFAULT_DELETE_FIELD = "_hoodie_is_deleted"; + private String userDefinedDeleteField = DEFAULT_DELETE_FIELD; Review comment: can we name it as "isDeletedField" or "isDeletedMarkerField". may or may not be user defined and hence. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r455440158 ## File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java ## @@ -176,11 +177,21 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx /** * Create a payload class via reflection, passing in an ordering/precombine value. */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) - throws IOException { + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, + Comparable orderingVal, + String deleteField) throws IOException { try { - return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {GenericRecord.class, Comparable.class}, record, orderingVal); + HoodieRecordPayload payload = null; + if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) && + !deleteField.isEmpty()) { Review comment: we can simplify this(I haven't gone through the entire patch yet). If you set default value for this config param to "_hoodie_is_deleted", then you don't need to check for isEmpty. also within OverwriteWithLatestAvroPayload, you could just take in the deleteKeyField as is. either it will point to "_hoodie_is_deleted" if not set. Else it will point to the one user has over-written. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #1819: [HUDI-1058] Make delete marker configurable
nsivabalan commented on a change in pull request #1819: URL: https://github.com/apache/hudi/pull/1819#discussion_r455440158 ## File path: hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java ## @@ -176,11 +177,21 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx /** * Create a payload class via reflection, passing in an ordering/precombine value. */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) - throws IOException { + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, + Comparable orderingVal, + String deleteField) throws IOException { try { - return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {GenericRecord.class, Comparable.class}, record, orderingVal); + HoodieRecordPayload payload = null; + if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()) && + !deleteField.isEmpty()) { Review comment: we can simplify this(I haven't gone through the entire patch yet). If you set default value for this config param to "_hoodie_is_deleted". then you don't need to check for isEmpty. also within OverwriteWithLatestAvroPayload, you could just take in the deleteKeyField as is. either it will point to "_hoodie_is_deleted" is not set. Else it will point to the one user has over-written. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org