This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5e61454 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) 5e61454 is described below commit 5e61454a6c343225e6ca90c4be79e0c2937a748a Author: Balaji Varadarajan <balaji.varadara...@robinhood.com> AuthorDate: Fri Sep 11 16:11:42 2020 -0700 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) --- .../org/apache/hudi/payload/AWSDmsAvroPayload.java | 23 +++- .../apache/hudi/payload/TestAWSDmsAvroPayload.java | 132 +++++++++++++++++++++ 2 files changed, 151 insertions(+), 4 deletions(-) diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index 975151c..73711c7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -53,10 +53,12 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { this(record.get(), (record1) -> 0); // natural order } - @Override - public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) - throws IOException { - IndexedRecord insertValue = getInsertValue(schema).get(); + /** + * + * Handle a possible delete - check for "D" in Op column and return empty row if found. + * @param insertValue The new row that is being "inserted". + */ + private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertValue) throws IOException { boolean delete = false; if (insertValue instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertValue; @@ -65,4 +67,17 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { return delete ? Option.empty() : Option.of(insertValue); } + + @Override + public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException { + IndexedRecord insertValue = super.getInsertValue(schema).get(); + return handleDeleteOperation(insertValue); + } + + @Override + public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { + IndexedRecord insertValue = super.getInsertValue(schema).get(); + return handleDeleteOperation(insertValue); + } } diff --git a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java new file mode 100644 index 0000000..802096a --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.payload; + +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestAWSDmsAvroPayload { + + private static final String AVRO_SCHEMA_STRING = "{\"type\": \"record\"," + + "\"name\": \"events\"," + "\"fields\": [ " + + "{\"name\": \"field1\", \"type\" : \"int\"}," + + "{\"name\": \"Op\", \"type\": \"string\"}" + + "]}"; + + @Test + public void testInsert() { + + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord record = new GenericData.Record(avroSchema); + record.put("field1", 0); + record.put("Op", "I"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record)); + + try { + Option<IndexedRecord> outputPayload = payload.getInsertValue(avroSchema); + assertTrue((int) outputPayload.get().get(0) == 0); + assertTrue(outputPayload.get().get(1).toString().equals("I")); + } catch (Exception e) { + fail("Unexpected exception"); + } + + } + + @Test + public void testUpdate() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord newRecord = new GenericData.Record(avroSchema); + newRecord.put("field1", 1); + newRecord.put("Op", "U"); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 0); + oldRecord.put("Op", "I"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord)); + + try { + Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema); + assertTrue((int) outputPayload.get().get(0) == 1); + assertTrue(outputPayload.get().get(1).toString().equals("U")); + } catch (Exception e) { + fail("Unexpected exception"); + } + + } + + @Test + public void testDelete() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord deleteRecord = new GenericData.Record(avroSchema); + deleteRecord.put("field1", 2); + deleteRecord.put("Op", "D"); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 2); + oldRecord.put("Op", "U"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); + + try { + Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema); + // expect nothing to be comitted to table + assertFalse(outputPayload.isPresent()); + } catch (Exception e) { + fail("Unexpected exception"); + } + + } + + @Test + public void testPreCombineWithDelete() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord deleteRecord = new GenericData.Record(avroSchema); + deleteRecord.put("field1", 4); + deleteRecord.put("Op", "D"); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 4); + oldRecord.put("Op", "I"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); + AWSDmsAvroPayload insertPayload = new AWSDmsAvroPayload(Option.of(oldRecord)); + + try { + OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload); + Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema); + // expect nothing to be comitted to table + assertFalse(outputPayload.isPresent()); + } catch (Exception e) { + fail("Unexpected exception"); + } + } +}