This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new ab11ba4 [REVERT] "[HUDI-1058] Make delete marker configurable (#1819)" (#1914) ab11ba4 is described below commit ab11ba43e1a5496cf85a7a772929bb90fcbf07d3 Author: Sivabalan Narayanan <sivab...@uber.com> AuthorDate: Tue Aug 4 18:20:38 2020 -0400 [REVERT] "[HUDI-1058] Make delete marker configurable (#1819)" (#1914) This reverts commit 433d7d2c9886fed161557efe88b62ebdce0fe5df. --- .../org/apache/hudi/config/HoodieWriteConfig.java | 9 -- .../model/OverwriteWithLatestAvroPayload.java | 10 +-- .../model/TestOverwriteWithLatestAvroPayload.java | 67 +++++---------- .../common/testutils/HoodieTestDataGenerator.java | 27 ++---- .../main/java/org/apache/hudi/DataSourceUtils.java | 23 ++--- .../SparkParquetBootstrapDataProvider.java | 2 +- .../scala/org/apache/hudi/DataSourceOptions.scala | 7 -- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +- .../functional/HoodieSparkSqlWriterSuite.scala | 46 ---------- .../hudi/utilities/deltastreamer/DeltaSync.java | 6 +- ...eltaStreamerWithOverwriteLatestAvroPayload.java | 97 ---------------------- .../resources/delta-streamer-config/source.avsc | 4 - .../sql-transformer.properties | 2 +- .../resources/delta-streamer-config/target.avsc | 4 - 14 files changed, 43 insertions(+), 266 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index affe553..80bc17e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -94,9 +94,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT .toString(); - public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field"; - public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted"; - public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; @@ -277,10 +274,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); } - public String getDeleteMarkerField() { - return props.getProperty(DELETE_MARKER_FIELD_PROP); - } - /** * compaction properties. */ @@ -964,8 +957,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE); setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE), BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); - setDefaultOnCondition(props, !props.containsKey(DELETE_MARKER_FIELD_PROP), - DELETE_MARKER_FIELD_PROP, DEFAULT_DELETE_MARKER_FIELD); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 0e4b18a..d8dffdf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -36,8 +36,6 @@ import java.io.IOException; public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> { - private String deleteMarkerField = "_hoodie_is_deleted"; - /** * */ @@ -49,12 +47,6 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order } - public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, - String deleteMarkerField) { - this(record, orderingVal); - this.deleteMarkerField = deleteMarkerField; - } - @Override public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) { // pick the payload with greatest ordering value @@ -88,7 +80,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ private boolean isDeleteRecord(GenericRecord genericRecord) { - Object deleteMarker = genericRecord.get(deleteMarkerField); + Object deleteMarker = genericRecord.get("_hoodie_is_deleted"); return (deleteMarker instanceof Boolean && (boolean) deleteMarker); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java index e212367..7c5951a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java @@ -37,8 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; public class TestOverwriteWithLatestAvroPayload { private Schema schema; - String defaultDeleteMarkerField = "_hoodie_is_deleted"; - String deleteMarkerField = "delete_marker_field"; @BeforeEach public void setUp() throws Exception { @@ -46,56 +44,26 @@ public class TestOverwriteWithLatestAvroPayload { new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null), new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), - new Schema.Field(defaultDeleteMarkerField, Schema.create(Type.BOOLEAN), "", false), - new Schema.Field(deleteMarkerField, Schema.create(Type.BOOLEAN), "", false) + new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN), "", false) )); } @Test - public void testOverwriteWithLatestAvroPayload() throws IOException { + public void testActiveRecords() throws IOException { GenericRecord record1 = new GenericData.Record(schema); record1.put("id", "1"); record1.put("partition", "partition0"); record1.put("ts", 0L); - record1.put(defaultDeleteMarkerField, false); - record1.put(deleteMarkerField, false); + record1.put("_hoodie_is_deleted", false); - // test1: set default marker field value to true and user defined to false GenericRecord record2 = new GenericData.Record(schema); record2.put("id", "2"); record2.put("partition", "partition1"); record2.put("ts", 1L); - record2.put(defaultDeleteMarkerField, true); - record2.put(deleteMarkerField, false); - - // set to user defined marker field with false, the record should be considered active. - assertActiveRecord(record1, record2, deleteMarkerField); - - // set to default marker field with true, the record should be considered delete. - assertDeletedRecord(record1, record2, defaultDeleteMarkerField); - - // test2: set default marker field value to false and user defined to true - GenericRecord record3 = new GenericData.Record(schema); - record3.put("id", "2"); - record3.put("partition", "partition1"); - record3.put("ts", 1L); - record3.put(defaultDeleteMarkerField, false); - record3.put(deleteMarkerField, true); - - // set to user defined marker field with true, the record should be considered delete. - assertDeletedRecord(record1, record3, deleteMarkerField); - - // set to default marker field with false, the record should be considered active. - assertActiveRecord(record1, record3, defaultDeleteMarkerField); - } - - private void assertActiveRecord(GenericRecord record1, - GenericRecord record2, String field) throws IOException { - OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload( - record1, 1, field); - OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload( - record2, 2, field); + record2.put("_hoodie_is_deleted", false); + OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1); + OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(record2, 2); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -106,12 +74,22 @@ public class TestOverwriteWithLatestAvroPayload { assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record2); } - private void assertDeletedRecord(GenericRecord record1, - GenericRecord delRecord1, String field) throws IOException { - OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload( - record1, 1, field); - OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload( - delRecord1, 2, field); + @Test + public void testDeletedRecord() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + + GenericRecord delRecord1 = new GenericData.Record(schema); + delRecord1.put("id", "2"); + delRecord1.put("partition", "partition1"); + delRecord1.put("ts", 1L); + delRecord1.put("_hoodie_is_deleted", true); + + OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1); + OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(delRecord1, 2); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -121,4 +99,5 @@ public class TestOverwriteWithLatestAvroPayload { assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record1); assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent()); } + } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 6c56ff1..1ead5ff 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -106,7 +106,6 @@ public class HoodieTestDataGenerator { + "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"}," + "{\"name\": \"weight\", \"type\": \"float\"}," + "{\"name\": \"nation\", \"type\": \"bytes\"}," - + "{\"name\": \"user_defined_delete_marker_field\", \"type\": \"boolean\", \"default\": false}," + "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}}," + "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}}," + "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},"; @@ -124,7 +123,7 @@ public class HoodieTestDataGenerator { + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); - public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,boolean,int,bigint,decimal(10,6)," + public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6)," + "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean"; @@ -180,18 +179,6 @@ public class HoodieTestDataGenerator { return null; } - public static List<GenericRecord> generateGenericRecords(int n, boolean isDeleteRecord, int instantTime) { - return IntStream.range(0, n).boxed().map(i -> { - String partitionPath = DEFAULT_FIRST_PARTITION_PATH; - HoodieKey key = new HoodieKey("id_" + i, partitionPath); - HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition(); - kp.key = key; - kp.partitionPath = partitionPath; - return HoodieTestDataGenerator.generateGenericRecord( - key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, instantTime, isDeleteRecord, false); - }).collect(Collectors.toList()); - } - /** * Generates a new avro record of the above nested schema format, * retaining the key if optionally provided. @@ -278,11 +265,11 @@ public class HoodieTestDataGenerator { rec.put("weight", RAND.nextFloat()); byte[] bytes = "Canada".getBytes(); rec.put("nation", ByteBuffer.wrap(bytes)); - rec.put("user_defined_delete_marker_field", isDeleteRecord); long currentTimeMillis = System.currentTimeMillis(); Date date = new Date(currentTimeMillis); rec.put("current_date", (int) date.toLocalDate().toEpochDay()); rec.put("current_ts", currentTimeMillis); + BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat())); Schema decimalSchema = AVRO_SCHEMA.getField("height").schema(); Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); @@ -305,7 +292,11 @@ public class HoodieTestDataGenerator { rec.put("tip_history", tipHistoryArray); } - rec.put("_hoodie_is_deleted", isDeleteRecord); + if (isDeleteRecord) { + rec.put("_hoodie_is_deleted", true); + } else { + rec.put("_hoodie_is_deleted", false); + } return rec; } @@ -769,8 +760,8 @@ public class HoodieTestDataGenerator { public static class KeyPartition implements Serializable { - public HoodieKey key; - public String partitionPath; + HoodieKey key; + String partitionPath; } public void close() { diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 16d6f8d..3345204 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -208,20 +207,11 @@ public class DataSourceUtils { /** * Create a payload class via reflection, passing in an ordering/precombine value. */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, - Comparable orderingVal, - String deleteMarkerField) throws IOException { + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) + throws IOException { try { - HoodieRecordPayload payload = null; - if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())) { - payload = (OverwriteWithLatestAvroPayload) ReflectionUtils.loadClass(payloadClass, - new Class<?>[]{GenericRecord.class, Comparable.class, String.class}, - record, orderingVal, deleteMarkerField); - } else { - payload = (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal); - } - return payload; + return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, + new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal); } catch (Throwable e) { throw new IOException("Could not create payload for class: " + payloadClass, e); } @@ -277,9 +267,8 @@ public class DataSourceUtils { } public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, - String payloadClass, - String deleteMarkerField) throws IOException { - HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal, deleteMarkerField); + String payloadClass) throws IOException { + HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); return new HoodieRecord<>(hKey, payload); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java index 0d5756f..32e5230 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java +++ b/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java @@ -70,7 +70,7 @@ public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataPr gr, props.getString("hoodie.datasource.write.precombine.field"), false); try { return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), - props.getString("hoodie.datasource.write.payload.class"), "_hoodie_is_deleted"); + props.getString("hoodie.datasource.write.payload.class")); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index e529a04..8a8f87f 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -185,13 +185,6 @@ object DataSourceWriteOptions { val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName /** - * Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same - * key value, we will check if the new record is deleted by the delete field. - */ - val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field" - val DEFAULT_DELETE_FIELD_OPT_VAL = "_hoodie_is_deleted" - - /** * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value * will be obtained by invoking .toString() on the field value. Nested fields can be specified using * the dot notation eg: `a.b.c` diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 574c907..05ef863 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -111,9 +111,7 @@ private[hudi] object HoodieSparkSqlWriter { val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false) .asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(gr, - orderingVal, keyGenerator.getKey(gr), - parameters(PAYLOAD_CLASS_OPT_KEY), - parameters(DELETE_FIELD_OPT_KEY)) + orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) }).toJavaRDD() // Handle various save modes @@ -206,7 +204,6 @@ private[hudi] object HoodieSparkSqlWriter { TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL, PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL, PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL, - DELETE_FIELD_OPT_KEY -> DEFAULT_DELETE_FIELD_OPT_VAL, RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL, PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL, KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL, diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 29dac2b..7f26481 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -100,52 +100,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } - test("test OverwriteWithLatestAvroPayload with user defined delete field") { - val session = SparkSession.builder() - .appName("test_append_mode") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate() - val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1") - - try { - val sqlContext = session.sqlContext - val hoodieFooTableName = "hoodie_foo_tbl" - - val keyField = "id" - val deleteMarkerField = "delete_field" - - //create a new table - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, - "hoodie.insert.shuffle.parallelism" -> "2", - "hoodie.upsert.shuffle.parallelism" -> "2", - DELETE_FIELD_OPT_KEY -> deleteMarkerField, - RECORDKEY_FIELD_OPT_KEY -> keyField) - val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier) - - val id1 = UUID.randomUUID().toString - val dataFrame = session.createDataFrame(Seq( - (id1, 1, false) - )) toDF(keyField, "ts", deleteMarkerField) - - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame) - val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count - assert(recordCount1 == 1, "result should be 1, but get " + recordCount1) - - val dataFrame2 = session.createDataFrame(Seq( - (id1, 2, true) - )) toDF(keyField, "ts", deleteMarkerField) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2) - - val recordCount2 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count() - assert(recordCount2 == 0, "result should be 0, but get " + recordCount2) - } finally { - session.stop() - FileUtils.deleteDirectory(path.toFile) - } - } - case class Test(uuid: String, ts: Long) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index ecca0d1..f0ecba0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -20,7 +20,6 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.TypedProperties; @@ -340,12 +339,9 @@ public class DeltaSync implements Serializable { } JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); - String deleteMarkerField = props.getString(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP, - HoodieWriteConfig.DEFAULT_DELETE_MARKER_FIELD); JavaRDD<HoodieRecord> records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false), - deleteMarkerField); + (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); }); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java deleted file mode 100644 index f98eb79..0000000 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities.functional; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; -import org.apache.hudi.utilities.sources.ParquetDFSSource; -import org.apache.hudi.utilities.testutils.UtilitiesTestBase; -import org.apache.spark.sql.Row; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase { - private static String PARQUET_SOURCE_ROOT; - private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; - - @BeforeAll - public static void initClass() throws Exception { - UtilitiesTestBase.initClass(true); - PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; - - // prepare the configs. - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, - dfsBasePath + "/sql-transformer.properties"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); - UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); - } - - @Test - public void testOverwriteLatestAvroPayload() throws Exception { - // test defaultDeleteMarkerField - this.testOverwriteLatestAvroPayload(null); - - // test userDefinedDeleteMarkerField - this.testOverwriteLatestAvroPayload("user_defined_delete_marker_field"); - } - - private void testOverwriteLatestAvroPayload(String deleteMarkerField) throws Exception { - String path = PARQUET_SOURCE_ROOT + "/1.parquet"; - List<GenericRecord> records = HoodieTestDataGenerator.generateGenericRecords(5, false, 0); - Helpers.saveParquetToDFS(records, new Path(path)); - - TypedProperties parquetProps = new TypedProperties(); - parquetProps.setProperty("include", "base.properties"); - parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); - parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT); - if (deleteMarkerField != null) { - parquetProps.setProperty(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP, deleteMarkerField); - } - Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); - - String tableBasePath = dfsBasePath + "/test_overwrite_lastest_avro_payload_table"; - - HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( - TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(), - null, PROPS_FILENAME_TEST_PARQUET, false, - false, 100000, false, null, null, "timestamp"), jsc); - deltaStreamer.sync(); - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + "/*/*.parquet", sqlContext); - - String path2 = PARQUET_SOURCE_ROOT + "/2.parquet"; - List<GenericRecord> records2 = HoodieTestDataGenerator.generateGenericRecords(4, true, 1); - Helpers.saveParquetToDFS(records2, new Path(path2)); - deltaStreamer.sync(); - - List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").collectAsList(); - assertEquals(1, rows.size()); - assertEquals(records.get(4).get("_row_key"), rows.get(0).getString(2)); - } -} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc index 7ae44b7..e912573 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -56,10 +56,6 @@ "name" : "nation", "type" : "bytes" },{ - "name" : "user_defined_delete_marker_field", - "type" : "boolean", - "default" : false - },{ "name" : "current_date", "type" : { "type" : "int", diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties index 5eaa0a7..dc735e8 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties @@ -16,4 +16,4 @@ # limitations under the License. ### include=base.properties -hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.user_defined_delete_marker_field, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a +hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc index 815e328..a6234f4 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc @@ -56,10 +56,6 @@ "name" : "nation", "type" : "bytes" },{ - "name" : "user_defined_delete_marker_field", - "type" : "boolean", - "default" : false - },{ "name" : "current_date", "type" : { "type" : "int",