[
https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217503#comment-15217503
]
ASF GitHub Bot commented on APEXMALHAR-2011:
--------------------------------------------
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57842432
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
---
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+/**
+ * <p>
+ * Avro File Input Operator
+ * </p>
+ * A specific implementation of the AbstractFileInputOperator to read Avro
+ * container files.<br>
+ * No need to provide schema,its inferred from the file<br>
+ * Users can add the {@link
IdempotentStorageManager.FSIdempotentStorageManager}
+ * to ensure exactly once semantics with a HDFS backed WAL.
+ *
+ * @displayName AvroFileInputOperator
+ * @category Input
+ * @tags fs, file,avro, input operator
+ */
[email protected]
+public class AvroFileInputOperator extends
AbstractFileInputOperator<GenericRecord>
+{
+
+ private transient long offset = 0L;
+
+ @AutoMetric
+ int recordCnt = 0;
+
+ @AutoMetric
+ int errorCnt = 0;
+
+ private transient DataFileStream<GenericRecord> avroDataStream;
+ private transient GenericRecord record = null;
+
+ public final transient DefaultOutputPort<GenericRecord> output = new
DefaultOutputPort<GenericRecord>();
+
+ public final transient DefaultOutputPort<String> completedFilesPort =
new DefaultOutputPort<String>();
+
+ public final transient DefaultOutputPort<String> errorRecordsPort = new
DefaultOutputPort<String>();
+
+ /**
+ * Returns a input stream given a file path
+ *
+ * @param path
+ * @return InputStream
+ * @throws IOException
+ */
+ @Override
+ protected InputStream openFile(Path path) throws IOException
+ {
+ InputStream is = super.openFile(path);
+ if (is != null) {
+ try {
+ DatumReader<GenericRecord> datumReader = new
GenericDatumReader<GenericRecord>();
+ avroDataStream = new DataFileStream<GenericRecord>(is,
datumReader);
+ datumReader.setSchema(avroDataStream.getSchema());
+ } catch (NullPointerException npe) {
+ LOG.error("Schemaless file", npe);
+ throw new NullPointerException();
+ }
+ }
+ return is;
+ }
+
+ @Override
+ protected GenericRecord readEntity() throws IOException
+ {
+ return readRecord();
+ }
+
+ /**
+ * Reads a GenericRecord from the given input stream<br>
+ * Emits the FileName,Offset,Exception on the error port if its connected
+ *
+ * @return GenericRecord
+ */
+ private GenericRecord readRecord() throws IOException
+ {
+ record = null;
+
+ try {
+ if (avroDataStream != null && avroDataStream.hasNext()) {
+ offset++;
+
+ record = avroDataStream.next();
+ recordCnt++;
+ return record;
+ }
+ } catch (AvroRuntimeException are) {
+ LOG.error("Exception in parsing record for file - " +
super.currentFile + " at offset - " + offset, are);
+ if (errorRecordsPort.isConnected()) {
+ errorRecordsPort.emit("FileName:" + super.currentFile + ",
Offset:" + offset);
+ }
+ errorCnt++;
+ throw new AvroRuntimeException(are);
+ }
+ return record;
+ }
+
+ @Override
+ protected void closeFile(InputStream is) throws IOException
+ {
+ String fileName = super.currentFile;
+ super.closeFile(is);
+ if (avroDataStream != null) {
+ avroDataStream.close();
+ }
+ if (completedFilesPort.isConnected()) {
+ completedFilesPort.emit(fileName);
+ }
+ offset = 0;
+ }
+
+ @Override
+ protected void emit(GenericRecord tuple)
+ {
+ if (tuple != null) {
+ output.emit(tuple);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ errorCnt = 0;
--- End diff --
Can you please check with @chandnisingh about what is the best practice to
reset autometric variables? In beginWindow or endWindow?
If values of AutoMetric variables are been picked up after endWindow is
called, the values will always be 0.
> POJO to Avro record converter
> -----------------------------
>
> Key: APEXMALHAR-2011
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: devendra tagare
>
> We are looking to develop a record converter which would take a POJO as an
> input and emit a Generic record as the output based on the given Avro schema.
> The expected inputs for this operator would be,
> 1.Class Name of the incoming POJO
> 2.Avro schema for the Generic Record to emit.
> This operator would receive an Object on its input port and emit a Generic
> record on the output port.
> To start with, we would handle primitive types and then go on to handle
> complex types.
> Thanks,
> Dev
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)