Tranadeep,

Also, in your code example, when *reuseAvroValue* is *false* the code will
fail with this message:

java.lang.RuntimeException: The class
'org.apache.avro.generic.GenericRecord' is not instantiable: The class is
no proper class, it is either abstract, an interface, or a primitive type.
at
org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:222)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:147)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:122)
at



I had encountered this when I was write the PR.

-Gna

On Thu, Apr 7, 2016 at 11:08 AM, Sourigna Phetsarath <
gna.phetsar...@teamaol.com> wrote:

> Tranadeep,
>
> Thanks for pasting your code!
>
> I have a PR ready that extends AvroInputFormat and will submit it soon.
>
> Still waiting for the legal team at AOL to approve it.
>
> -Gna
>
> On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh <tarand...@gmail.com>
> wrote:
>
>> Thank you Gna for opening the ticket.
>>
>> I looked into AvroInputFormat code and inspired by it I wrote a
>> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
>> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
>> in flink to support GenericRecord.
>>
>> Anyways, I am pasting the code here for anyone who wants to use it (till
>> your code is part of Flink stable release)-
>>
>> import java.io.IOException;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileReader;
>> import org.apache.avro.file.FileReader;
>> import org.apache.avro.file.SeekableInput;
>> import org.apache.avro.generic.GenericDatumReader;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.io.DatumReader;
>> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
>> import org.apache.flink.api.common.io.FileInputFormat;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.TypeExtractor;
>> import org.apache.flink.core.fs.FileInputSplit;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.util.InstantiationUtil;
>>
>> public class GenericAvroInputFormat extends FileInputFormat<GenericRecord> 
>> implements ResultTypeQueryable<GenericRecord> {
>>
>>     private transient long end;
>>     private transient Schema schema;
>>     private transient FileReader<GenericRecord> fileReader;
>>     private boolean reuseAvroValue = true;
>>
>>     private static final long serialVersionUID = 1L;
>>
>>     public GenericAvroInputFormat(Path filePath, Schema schema) {
>>         super(filePath);
>>         this.schema = schema;
>>     }
>>
>>     public void setReuseAvroValue(boolean reuseAvroValue) {
>>         this.reuseAvroValue = reuseAvroValue;
>>     }
>>
>>     public void setUnsplittable(boolean unsplittable) {
>>         this.unsplittable = unsplittable;
>>     }
>>
>>     @Override
>>     public TypeInformation<GenericRecord> getProducedType() {
>>         return TypeExtractor.getForClass(GenericRecord.class);
>>     }
>>
>>     @Override
>>     public void open(FileInputSplit split) throws IOException {
>>         super.open(split);
>>         SeekableInput sin = new FSDataInputStreamWrapper(stream, 
>> split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
>>         DatumReader<GenericRecord> reader = new GenericDatumReader<>();
>>         fileReader = DataFileReader.openReader(sin, reader);
>>         fileReader.sync(split.getStart());
>>         this.end = split.getStart() + split.getLength();
>>     }
>>
>>     @Override
>>     public boolean reachedEnd() throws IOException {
>>         return !fileReader.hasNext() || fileReader.pastSync(end);
>>     }
>>
>>     @Override
>>     public GenericRecord nextRecord(GenericRecord reuseValue) throws 
>> IOException {
>>         if (reachedEnd()) {
>>             return null;
>>         }
>>
>>         if (!reuseAvroValue) {
>>             reuseValue = InstantiationUtil.instantiate(GenericRecord.class, 
>> Object.class);
>>         }
>>
>>         reuseValue = fileReader.next(reuseValue);
>>         return reuseValue;
>>     }
>> }
>>
>>
>> Usage:
>>
>> public static void main(String[] args) throws Exception {
>>     final ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>>     final Path inPath = new Path(args[0]);
>>
>>     Schema schema = new Schema.Parser().parse(new 
>> File("/path/to/schemafile.avsc"));
>>     DataSet<GenericRecord> dataSet = env.createInput(new 
>> GenericAvroInputFormat(inPath, schema));
>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Long,String>>() {
>>         @Override
>>         public Tuple2<Long,String> map(GenericRecord record) {
>>             Long id = (Long) record.get("id");
>>             String someString = record.get("somestring").toString();
>>             return new Tuple2<>(id, someString);
>>         }
>>     }).writeAsText(args[1]);
>>
>>     env.execute();
>> }
>>
>>
>> -Tarandeep
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
>> gna.phetsar...@teamaol.com> wrote:
>>
>>> Tarandeep,
>>>
>>> There isn't a way yet, but I am proposing to do one:
>>> https://issues.apache.org/jira/browse/FLINK-3691
>>>
>>> -Gna
>>>
>>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh <tarand...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Can someone please point me to an example of creating DataSet using
>>>> Avro Generic Records?
>>>>
>>>> I tried this code -
>>>>
>>>>     final ExecutionEnvironment env = 
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>     final Path iPath = new Path(args[0]);
>>>>
>>>>     DataSet<GenericRecord> dataSet = env.createInput(new 
>>>> AvroInputFormat<>(iPath, GenericRecord.class));
>>>>     dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
>>>>         @Override
>>>>         public Tuple2<Integer,String> map(GenericRecord record) {
>>>>             Integer id = (Integer) record.get("id");
>>>>             String userAgent = (String) record.get("user_agent");
>>>>             return new Tuple2<>(id, userAgent);
>>>>         }
>>>>     }).writeAsText(args[1]);
>>>>
>>>>     env.execute();
>>>>
>>>> But I got an exception-
>>>>
>>>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>>>> interface org.apache.avro.generic.GenericRecord
>>>>     at
>>>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>>>>     at
>>>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>>>     at
>>>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
>>>>     at
>>>> org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
>>>>     at
>>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
>>>>     at
>>>> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
>>>>     at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
>>>> file as SpecificRecords. Is there a way to read Avro file as 
>>>> GenericRecords?
>>>>
>>>>
>>>> Thanks,
>>>> Tarandeep
>>>>
>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>> Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * <http://www.aolplatforms.com>*
>>>
>>
>>
>
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>



-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Reply via email to