[hudi] branch master updated: [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084)
This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 5e61454 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) 5e61454 is described below commit 5e61454a6c343225e6ca90c4be79e0c2937a748a Author: Balaji Varadarajan AuthorDate: Fri Sep 11 16:11:42 2020 -0700 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) --- .../org/apache/hudi/payload/AWSDmsAvroPayload.java | 23 +++- .../apache/hudi/payload/TestAWSDmsAvroPayload.java | 132 + 2 files changed, 151 insertions(+), 4 deletions(-) diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index 975151c..73711c7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -53,10 +53,12 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { this(record.get(), (record1) -> 0); // natural order } - @Override - public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) - throws IOException { -IndexedRecord insertValue = getInsertValue(schema).get(); + /** + * + * Handle a possible delete - check for "D" in Op column and return empty row if found. + * @param insertValue The new row that is being "inserted". + */ + private Option handleDeleteOperation(IndexedRecord insertValue) throws IOException { boolean delete = false; if (insertValue instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertValue; @@ -65,4 +67,17 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { return delete ? Option.empty() : Option.of(insertValue); } + + @Override + public Option getInsertValue(Schema schema) throws IOException { +IndexedRecord insertValue = super.getInsertValue(schema).get(); +return handleDeleteOperation(insertValue); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { +IndexedRecord insertValue = super.getInsertValue(schema).get(); +return handleDeleteOperation(insertValue); + } } diff --git a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java new file mode 100644 index 000..802096a --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.payload; + +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestAWSDmsAvroPayload { + + private static final String AVRO_SCHEMA_STRING = "{\"type\": \"record\"," + + "\"name\": \"events\"," + "\"fields\": [ " + + "{\"name\": \"field1\", \"type\" : \"int\"}," + + "{\"name\": \"Op\", \"type\": \"string\"}" + + "]}"; + + @Test + public void testInsert() { + +Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); +GenericRecord record = new GenericData.Record(avroSchema); +record.put("field1", 0); +record.put("Op", "I"); + +AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record)); + +try { + Option outputPayload = payload.getInsertValue(avroSchema); + assertTrue((int) outputPayload.get().get(0) == 0); +
[hudi] branch master updated: [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084)
This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 5e61454 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) 5e61454 is described below commit 5e61454a6c343225e6ca90c4be79e0c2937a748a Author: Balaji Varadarajan AuthorDate: Fri Sep 11 16:11:42 2020 -0700 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) --- .../org/apache/hudi/payload/AWSDmsAvroPayload.java | 23 +++- .../apache/hudi/payload/TestAWSDmsAvroPayload.java | 132 + 2 files changed, 151 insertions(+), 4 deletions(-) diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index 975151c..73711c7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -53,10 +53,12 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { this(record.get(), (record1) -> 0); // natural order } - @Override - public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) - throws IOException { -IndexedRecord insertValue = getInsertValue(schema).get(); + /** + * + * Handle a possible delete - check for "D" in Op column and return empty row if found. + * @param insertValue The new row that is being "inserted". + */ + private Option handleDeleteOperation(IndexedRecord insertValue) throws IOException { boolean delete = false; if (insertValue instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertValue; @@ -65,4 +67,17 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { return delete ? Option.empty() : Option.of(insertValue); } + + @Override + public Option getInsertValue(Schema schema) throws IOException { +IndexedRecord insertValue = super.getInsertValue(schema).get(); +return handleDeleteOperation(insertValue); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { +IndexedRecord insertValue = super.getInsertValue(schema).get(); +return handleDeleteOperation(insertValue); + } } diff --git a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java new file mode 100644 index 000..802096a --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.payload; + +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestAWSDmsAvroPayload { + + private static final String AVRO_SCHEMA_STRING = "{\"type\": \"record\"," + + "\"name\": \"events\"," + "\"fields\": [ " + + "{\"name\": \"field1\", \"type\" : \"int\"}," + + "{\"name\": \"Op\", \"type\": \"string\"}" + + "]}"; + + @Test + public void testInsert() { + +Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); +GenericRecord record = new GenericData.Record(avroSchema); +record.put("field1", 0); +record.put("Op", "I"); + +AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record)); + +try { + Option outputPayload = payload.getInsertValue(avroSchema); + assertTrue((int) outputPayload.get().get(0) == 0); +
[hudi] branch master updated: [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084)
This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git The following commit(s) were added to refs/heads/master by this push: new 5e61454 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) 5e61454 is described below commit 5e61454a6c343225e6ca90c4be79e0c2937a748a Author: Balaji Varadarajan AuthorDate: Fri Sep 11 16:11:42 2020 -0700 [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084) --- .../org/apache/hudi/payload/AWSDmsAvroPayload.java | 23 +++- .../apache/hudi/payload/TestAWSDmsAvroPayload.java | 132 + 2 files changed, 151 insertions(+), 4 deletions(-) diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index 975151c..73711c7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -53,10 +53,12 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { this(record.get(), (record1) -> 0); // natural order } - @Override - public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) - throws IOException { -IndexedRecord insertValue = getInsertValue(schema).get(); + /** + * + * Handle a possible delete - check for "D" in Op column and return empty row if found. + * @param insertValue The new row that is being "inserted". + */ + private Option handleDeleteOperation(IndexedRecord insertValue) throws IOException { boolean delete = false; if (insertValue instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertValue; @@ -65,4 +67,17 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { return delete ? Option.empty() : Option.of(insertValue); } + + @Override + public Option getInsertValue(Schema schema) throws IOException { +IndexedRecord insertValue = super.getInsertValue(schema).get(); +return handleDeleteOperation(insertValue); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { +IndexedRecord insertValue = super.getInsertValue(schema).get(); +return handleDeleteOperation(insertValue); + } } diff --git a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java new file mode 100644 index 000..802096a --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.payload; + +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestAWSDmsAvroPayload { + + private static final String AVRO_SCHEMA_STRING = "{\"type\": \"record\"," + + "\"name\": \"events\"," + "\"fields\": [ " + + "{\"name\": \"field1\", \"type\" : \"int\"}," + + "{\"name\": \"Op\", \"type\": \"string\"}" + + "]}"; + + @Test + public void testInsert() { + +Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); +GenericRecord record = new GenericData.Record(avroSchema); +record.put("field1", 0); +record.put("Op", "I"); + +AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record)); + +try { + Option outputPayload = payload.getInsertValue(avroSchema); + assertTrue((int) outputPayload.get().get(0) == 0); +