Tranadeep,

Thanks for pasting your code!

I have a PR ready that extends AvroInputFormat and will submit it soon.

Still waiting for the legal team at AOL to approve it.

-Gna

On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <tarand...@gmail.com> wrote:

> Thank you Gna for opening the ticket.
>
> I looked into AvroInputFormat code and inspired by it I wrote a
> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
> in flink to support GenericRecord.
>
> Anyways, I am pasting the code here for anyone who wants to use it (till
> your code is part of Flink stable release)-
>
> import java.io.IOException;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.FileReader;
> import org.apache.avro.file.SeekableInput;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumReader;
> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
> import org.apache.flink.api.common.io.FileInputFormat;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.core.fs.FileInputSplit;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.util.InstantiationUtil;
>
> public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> 
> implements ResultTypeQueryable<GenericRecord> {
>
>     private transient long end;
>     private transient Schema schema;
>     private transient FileReader<GenericRecord> fileReader;
>     private boolean reuseAvroValue = true;
>
>     private static final long serialVersionUID = 1L;
>
>     public GenericAvroInputFormat(Path filePath, Schema schema) {
>         super(filePath);
>         this.schema = schema;
>     }
>
>     public void setReuseAvroValue(boolean reuseAvroValue) {
>         this.reuseAvroValue = reuseAvroValue;
>     }
>
>     public void setUnsplittable(boolean unsplittable) {
>         this.unsplittable = unsplittable;
>     }
>
>     @Override
>     public TypeInformation<GenericRecord> getProducedType() {
>         return TypeExtractor.getForClass(GenericRecord.class);
>     }
>
>     @Override
>     public void open(FileInputSplit split) throws IOException {
>         super.open(split);
>         SeekableInput sin = new FSDataInputStreamWrapper(stream, 
> split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
>         DatumReader<GenericRecord> reader = new GenericDatumReader<>();
>         fileReader = DataFileReader.openReader(sin, reader);
>         fileReader.sync(split.getStart());
>         this.end = split.getStart() + split.getLength();
>     }
>
>     @Override
>     public boolean reachedEnd() throws IOException {
>         return !fileReader.hasNext() || fileReader.pastSync(end);
>     }
>
>     @Override
>     public GenericRecord nextRecord(GenericRecord reuseValue) throws 
> IOException {
>         if (reachedEnd()) {
>             return null;
>         }
>
>         if (!reuseAvroValue) {
>             reuseValue = InstantiationUtil.instantiate(GenericRecord.class, 
> Object.class);
>         }
>
>         reuseValue = fileReader.next(reuseValue);
>         return reuseValue;
>     }
> }
>
>
> Usage:
>
> public static void main(String[] args) throws Exception {
>     final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>     final Path inPath = new Path(args[0]);
>
>     Schema schema = new Schema.Parser().parse(new 
> File("/path/to/schemafile.avsc"));
>     DataSet<GenericRecord> dataSet = env.createInput(new 
> GenericAvroInputFormat(inPath, schema));
>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
>         @Override
>         public Tuple2<Long,String> map(GenericRecord record) {
>             Long id = (Long) record.get("id");
>             String someString = record.get("somestring").toString();
>             return new Tuple2<>(id, someString);
>         }
>     }).writeAsText(args[1]);
>
>     env.execute();
> }
>
>
> -Tarandeep
>
>
>
>
>
>
>
> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
> gna.phetsar...@teamaol.com> wrote:
>
>> Tarandeep,
>>
>> There isn't a way yet, but I am proposing to do one:
>> https://issues.apache.org/jira/browse/FLINK-3691
>>
>> -Gna
>>
>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <tarand...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Can someone please point me to an example of creating DataSet using Avro
>>> Generic Records?
>>>
>>> I tried this code -
>>>
>>>     final ExecutionEnvironment env = 
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>     final Path iPath = new Path(args[0]);
>>>
>>>     DataSet<GenericRecord> dataSet = env.createInput(new 
>>> AvroInputFormat<>(iPath, GenericRecord.class));
>>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>>         @Override
>>>         public Tuple2<Integer,String> map(GenericRecord record) {
>>>             Integer id = (Integer) record.get("id");
>>>             String userAgent = (String) record.get("user_agent");
>>>             return new Tuple2<>(id, userAgent);
>>>         }
>>>     }).writeAsText(args[1]);
>>>
>>>     env.execute();
>>>
>>> But I got an exception-
>>>
>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>>> interface org.apache.avro.generic.GenericRecord
>>>     at
>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>>     at
>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>>     at
>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>>     at
>>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>>     at
>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>>     at
>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>>     at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>>> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>>>
>>>
>>> Thanks,
>>> Tarandeep
>>>
>>
>>
>>
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>> Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * <http://www.aolplatforms.com>*
>>
>
>


-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Reply via email to