[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15218471#comment-15218471
 ] 

ASF GitHub Bot commented on APEXMALHAR-2011:
--------------------------------------------

Github user devtagare commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/211#discussion_r57934935
  
    --- Diff: 
contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java 
---
    @@ -0,0 +1,164 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.avro;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.avro.AvroRuntimeException;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.DatumReader;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
    +
    +/**
    + * <p>
    + * Avro File Input Operator
    + * </p>
    + * A specific implementation of the AbstractFileInputOperator to read Avro
    + * container files.<br>
    + * No need to provide schema,its inferred from the file<br>
    + * Users can add the {@link 
IdempotentStorageManager.FSIdempotentStorageManager}
    + * to ensure exactly once semantics with a HDFS backed WAL.
    + * 
    + * @displayName AvroFileInputOperator
    + * @category Input
    + * @tags fs, file,avro, input operator
    + */
    [email protected]
    +public class AvroFileInputOperator extends 
AbstractFileInputOperator<GenericRecord>
    +{
    +
    +  private transient long offset = 0L;
    +
    +  @AutoMetric
    +  int recordCnt = 0;
    +
    +  @AutoMetric
    +  int errorCnt = 0;
    +
    +  private transient DataFileStream<GenericRecord> avroDataStream;
    +  private transient GenericRecord record = null;
    +
    +  public final transient DefaultOutputPort<GenericRecord> output = new 
DefaultOutputPort<GenericRecord>();
    +
    +  public final transient DefaultOutputPort<String> completedFilesPort = 
new DefaultOutputPort<String>();
    +
    +  public final transient DefaultOutputPort<String> errorRecordsPort = new 
DefaultOutputPort<String>();
    +
    +  /**
    +   * Returns a input stream given a file path
    +   * 
    +   * @param path
    +   * @return InputStream
    +   * @throws IOException
    +   */
    +  @Override
    +  protected InputStream openFile(Path path) throws IOException
    +  {
    +    InputStream is = super.openFile(path);
    +    if (is != null) {
    +      try {
    +        DatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>();
    +        avroDataStream = new DataFileStream<GenericRecord>(is, 
datumReader);
    +        datumReader.setSchema(avroDataStream.getSchema());
    +      } catch (NullPointerException npe) {
    +        LOG.error("Schemaless file", npe);
    +        throw new NullPointerException();
    +      }
    +    }
    +    return is;
    +  }
    +
    +  @Override
    +  protected GenericRecord readEntity() throws IOException
    +  {
    +    return readRecord();
    +  }
    +
    +  /**
    +   * Reads a GenericRecord from the given input stream<br>
    +   * Emits the FileName,Offset,Exception on the error port if its connected
    +   * 
    +   * @return GenericRecord
    +   */
    +  private GenericRecord readRecord() throws IOException
    +  {
    +    record = null;
    +
    +    try {
    +      if (avroDataStream != null && avroDataStream.hasNext()) {
    +        offset++;
    +
    +        record = avroDataStream.next();
    +        recordCnt++;
    +        return record;
    +      }
    +    } catch (AvroRuntimeException are) {
    +      LOG.error("Exception in parsing record for file - " + 
super.currentFile + " at offset - " + offset, are);
    +      if (errorRecordsPort.isConnected()) {
    +        errorRecordsPort.emit("FileName:" + super.currentFile + ", 
Offset:" + offset);
    +      }
    +      errorCnt++;
    +      throw new AvroRuntimeException(are);
    +    }
    +    return record;
    +  }
    +
    +  @Override
    +  protected void closeFile(InputStream is) throws IOException
    +  {
    +    String fileName = super.currentFile;
    +    super.closeFile(is);
    +    if (avroDataStream != null) {
    +      avroDataStream.close();
    +    }
    +    if (completedFilesPort.isConnected()) {
    +      completedFilesPort.emit(fileName);
    +    }
    +    offset = 0;
    +  }
    +
    +  @Override
    +  protected void emit(GenericRecord tuple)
    +  {
    +    if (tuple != null) {
    +      output.emit(tuple);
    +    }
    +  }
    +
    +  @Override
    +  public void endWindow()
    +  {
    +    errorCnt = 0;
    --- End diff --
    
    Noticed that autometric variables are not marked private in several 
operators.see CsvParser, JsonParser. They will not be accessible in the test 
class unless getters are provided.
    Has the usage changed since they were first implemented ?
    beginWindow seems a better place for resetting the values, moving it there.


> POJO to Avro record converter
> -----------------------------
>
>                 Key: APEXMALHAR-2011
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2011
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: devendra tagare
>
> We are looking to develop a record converter which would take a POJO as an 
> input and emit a Generic record as the output based on the given Avro schema.
> The expected inputs for this operator would be,
> 1.Class Name of the incoming POJO
> 2.Avro schema for the Generic Record to emit.
> This operator would receive an Object on its input port and emit a Generic 
> record on the output port.
> To start with, we would handle primitive types and then go on to handle 
> complex types.
> Thanks,
> Dev



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to