[ https://issues.apache.org/jira/browse/APEXMALHAR-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299411#comment-16299411 ]
ASF GitHub Bot commented on APEXMALHAR-2034: -------------------------------------------- tweise closed pull request #670: APEXMALHAR-2034 Adding new Avro Module to encapsulate Container File to Avro GenericRecord to POJO transformations URL: https://github.com/apache/apex-malhar/pull/670 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java new file mode 100644 index 0000000000..8ad00dfc89 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileToPojoModule.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; + +/** + * <p> + * Avro File To Pojo Module + * </p> + * This module emits Pojo based on the schema derived from the + * input file<br> + * + * Example of how to configure and add this module to DAG + * + * AvroFileToPojoModule avroFileToPojoModule = new AvroFileToPojoModule(); + * avroFileToPojoModule.setPojoClass([className.class]); + * avroFileToPojoModule.setAvroFileDirectory(conf.get("[configuration property]", "[default file directory]")); + * avroFileToPojoModule = dag.addModule("avroFileToPojoModule", avroFileToPojoModule); + * + * No need to provide schema,its inferred from the file<br> + * + * Users can add the {@link FSWindowDataManager} + * to ensure exactly once semantics with a HDFS backed WAL. + * + * @displayName AvroFileToPojoModule + * @category Input + * @tags fs, file,avro, input operator, generic record, pojo + * + * @since + */ +public class AvroFileToPojoModule implements Module +{ + public final transient ProxyOutputPort<Object> output = new ProxyOutputPort<>(); + public final transient ProxyOutputPort<GenericRecord> errorPort = new ProxyOutputPort<>(); + //output ports from AvroFileInputOperator + public final transient ProxyOutputPort<String> completedAvroFilesPort = new ProxyOutputPort<>(); + public final transient ProxyOutputPort<String> avroErrorRecordsPort = new ProxyOutputPort<>(); + + private AvroFileInputOperator avroFileInputOperator = new AvroFileInputOperator(); + Class<?> pojoClass = null; + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + AvroFileInputOperator avroFileInputOperator = dag.addOperator("AvroFileInputOperator", this.avroFileInputOperator); + AvroToPojo avroToPojo = dag.addOperator("AvroGenericObjectToPojo", new AvroToPojo()); + + dag.setOutputPortAttribute(avroToPojo.output, Context.PortContext.TUPLE_CLASS, pojoClass); + + dag.addStream("avroFileContainerToPojo", avroFileInputOperator.output, avroToPojo.data) + .setLocality(DAG.Locality.CONTAINER_LOCAL); + + output.set(avroToPojo.output); + errorPort.set(avroToPojo.errorPort); + + completedAvroFilesPort.set(avroFileInputOperator.completedFilesPort); + avroErrorRecordsPort.set(avroFileInputOperator.errorRecordsPort); + } + + public void setPojoClass(Class<?> pojoClass) + { + this.pojoClass = pojoClass; + } + + public void setAvroFileDirectory(String directory) + { + avroFileInputOperator.setDirectory(directory); + } +} diff --git a/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java new file mode 100644 index 0000000000..696547512f --- /dev/null +++ b/contrib/src/test/java/com/datatorrent/contrib/avro/AvroFileToPojoModuleTest.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.avro; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.python.google.common.collect.Lists; + +import org.apache.apex.engine.EmbeddedAppLauncherImpl; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.lib.helper.TestPortContext; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; + +/** + * <p> + * In this class the emitTuples method is called twice to process the first + * input, since on begin window 0 the operator is setup & stream is initialized. + * The platform calls the emitTuples method in the successive windows + * </p> + */ +public class AvroFileToPojoModuleTest +{ + + private static final String AVRO_SCHEMA = "{\"namespace\":\"abc\"," + "" + "\"type\":\"record\",\"doc\":\"Order schema\"," + "\"name\":\"Order\",\"fields\":[{\"name\":\"orderId\"," + "\"type\": \"long\"}," + "{\"name\":\"customerId\",\"type\": \"int\"}," + "{\"name\":\"total\",\"type\": \"double\"}," + "{\"name\":\"customerName\",\"type\": \"string\"}]}"; + + private static final String FILENAME = "/tmp/simpleorder.avro"; + private static final String OTHER_FILE = "/tmp/simpleorder2.avro"; + + AvroFileToPojoModule avroFileToPojoModule = new AvroFileToPojoModule(); + + private List<GenericRecord> recordList = null; + + public static class TestMeta extends TestWatcher + { + public String dir = null; + OperatorContext context; + PortContext portContext; + + @Override + protected void starting(org.junit.runner.Description description) + { + String methodName = description.getMethodName(); + String className = description.getClassName(); + this.dir = "target/" + className + "/" + methodName; + Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(Context.DAGContext.APPLICATION_PATH, dir); + context = mockOperatorContext(1, attributes); + Attribute.AttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, SimpleOrder.class); + portContext = new TestPortContext(portAttributes); + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + private void createAvroInput(int cnt) + { + recordList = Lists.newArrayList(); + + while (cnt > 0) { + GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(AVRO_SCHEMA)); + rec.put("orderId", cnt * 1L); + rec.put("customerId", cnt * 2); + rec.put("total", cnt * 1.5); + rec.put("customerName", "*" + cnt + "*"); + cnt--; + recordList.add(rec); + } + } + + private void writeAvroFile(File outputFile) + { + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(new Schema.Parser().parse(AVRO_SCHEMA)); + try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(new Schema.Parser().parse(AVRO_SCHEMA), outputFile); + + for (GenericRecord record : recordList) { + dataFileWriter.append(record); + } + FileUtils.moveFileToDirectory(new File(outputFile.getAbsolutePath()), new File(testMeta.dir), true); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testAvroToPojoModule() throws Exception + { + try { + FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true); + int cnt = 7; + createAvroInput(cnt); + writeAvroFile(new File(FILENAME)); + createAvroInput(cnt - 2); + writeAvroFile(new File(OTHER_FILE)); + + avroFileToPojoModule.setAvroFileDirectory(testMeta.dir); + avroFileToPojoModule.setPojoClass(SimpleOrder.class); + + AvroToPojo avroToPojo = new AvroToPojo(); + avroToPojo.setPojoClass(SimpleOrder.class); + + EmbeddedAppLauncherImpl lma = new EmbeddedAppLauncherImpl(); + Configuration conf = new Configuration(false); + + AvroToPojoApplication avroToPojoApplication = new AvroToPojoApplication(); + avroToPojoApplication.setAvroFileToPojoModule(avroFileToPojoModule); + + lma.prepareDAG(avroToPojoApplication, conf); + EmbeddedAppLauncherImpl.Controller lc = lma.getController(); + lc.run(10000);// runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + public static class AvroToPojoApplication implements StreamingApplication + { + AvroFileToPojoModule avroFileToPojoModule; + + public AvroFileToPojoModule getAvroFileToPojoModule() + { + return avroFileToPojoModule; + } + + public void setAvroFileToPojoModule(AvroFileToPojoModule avroFileToPojoModule) + { + this.avroFileToPojoModule = avroFileToPojoModule; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + AvroFileToPojoModule avroFileToPojoModule = dag.addModule("avroFileToPojoModule", getAvroFileToPojoModule()); + ConsoleOutputOperator consoleOutput = dag.addOperator("console", new ConsoleOutputOperator()); + + dag.addStream("POJO", avroFileToPojoModule.output, consoleOutput.input); + } + } + + public static class SimpleOrder + { + private Integer customerId; + private Long orderId; + private Double total; + private String customerName; + + public SimpleOrder() + { + } + + public SimpleOrder(int customerId, long orderId, double total, String customerName) + { + setCustomerId(customerId); + setOrderId(orderId); + setTotal(total); + setCustomerName(customerName); + } + + public String getCustomerName() + { + return customerName; + } + + public void setCustomerName(String customerName) + { + this.customerName = customerName; + } + + public Integer getCustomerId() + { + return customerId; + } + + public void setCustomerId(Integer customerId) + { + this.customerId = customerId; + } + + public Long getOrderId() + { + return orderId; + } + + public void setOrderId(Long orderId) + { + this.orderId = orderId; + } + + public Double getTotal() + { + return total; + } + + public void setTotal(Double total) + { + this.total = total; + } + + @Override + public String toString() + { + return "SimpleOrder [customerId=" + customerId + ", orderId=" + orderId + ", total=" + total + ", customerName=" + customerName + "]"; + } + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avro File To POJO Module > ------------------------ > > Key: APEXMALHAR-2034 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2034 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: devendra tagare > Assignee: Saumya Mohan > > Issue: > Avro objects are not serialized by Kryo causing the Avro GenericRecord to not > be available to downstream operators if users don't explicitly mark the > stream locality at container_local or thread_local. > Solution: > This JIRA is used to create a Module on top of AvroFileInputOperator and > AvroToPojo operators such that downstream operators will access POJO instead > of Avro GenericRecord. It, therefore, removes the exposure of GenericRecord > to downstream operators and instead exposes the created POJO to downstream > operators. > In this Module, the stream between the two encapsulated operators > (AvroFileInputOperator and AvroToPojo) is set to CONTAINER_LOCAL. > -Along with this new module, existing avro support files are moved from > contrib module to a new 'avro' module.- > --------------------------------------------------------------------------------------------------------------------------------------------------------------------- > *Unit Test* > Unit test for this Avro module has been added in malhar-avro package. > -*Move to new package and Backward compatibility*- > -Additionally, this module is part of a new package 'malhar-avro' and the > operator files/tests are all moved from contrib package to the new package.- > -Old operator files are marked deprecated and made to extend from new > operator files for backward compatibility.- > -Creating a new maven module for Avro is in accordance with the JIRA > "https://issues.apache.org/jira/browse/APEXMALHAR-1843."- > -Git history of all the moved files is maintained- > *Application Level Testing* > - To test the module, I created a sample StreamingApplication and a POJO > class. This application adds the new AvroToPojoModule, and ConsoleOperator to > the DAG. ConsoleOperator received and displayed POJO from the module > -To test backward compatibility, I created sample application which adds > AvroFileInputOperator and AvroToPojo from the old package to the DAG. It also > adds ConsoleOperator to the DAG. ConsoleOperator received and displayed POJO > from the module- -- This message was sent by Atlassian JIRA (v6.4.14#64029)