Example - Reading Avro Generic records

2016-04-01 Thread Tarandeep Singh
Hi,

Can someone please point me to an example of creating DataSet using Avro
Generic Records?

I tried this code -

final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final Path iPath = new Path(args[0]);

DataSet dataSet = env.createInput(new
AvroInputFormat<>(iPath, GenericRecord.class));
dataSet.map(new MapFunction>() {
@Override
public Tuple2 map(GenericRecord record) {
Integer id = (Integer) record.get("id");
String userAgent = (String) record.get("user_agent");
return new Tuple2<>(id, userAgent);
}
}).writeAsText(args[1]);

env.execute();

But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
interface org.apache.avro.generic.GenericRecord
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
at
org.apache.avro.reflect.ReflectDatumReader.(ReflectDatumReader.java:50)
at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro
file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep


Re: Example - Reading Avro Generic records

2016-04-01 Thread Sourigna Phetsarath
Tarandeep,

There isn't a way yet, but I am proposing to do one:
https://issues.apache.org/jira/browse/FLINK-3691

-Gna

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh  wrote:

> Hi,
>
> Can someone please point me to an example of creating DataSet using Avro
> Generic Records?
>
> I tried this code -
>
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> final Path iPath = new Path(args[0]);
>
> DataSet dataSet = env.createInput(new 
> AvroInputFormat<>(iPath, GenericRecord.class));
> dataSet.map(new MapFunction>() {
> @Override
> public Tuple2 map(GenericRecord record) {
> Integer id = (Integer) record.get("id");
> String userAgent = (String) record.get("user_agent");
> return new Tuple2<>(id, userAgent);
> }
> }).writeAsText(args[1]);
>
> env.execute();
>
> But I got an exception-
>
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
> at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
> at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
> at
> org.apache.avro.reflect.ReflectDatumReader.(ReflectDatumReader.java:50)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>
>
> Thanks,
> Tarandeep
>



-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* *


Re: Example - Reading Avro Generic records

2016-04-01 Thread Sourigna Phetsarath
There is a way yet, but I am proposing to do one:
https://issues.apache.org/jira/browse/FLINK-3691

On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh  wrote:

> Hi,
>
> Can someone please point me to an example of creating DataSet using Avro
> Generic Records?
>
> I tried this code -
>
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> final Path iPath = new Path(args[0]);
>
> DataSet dataSet = env.createInput(new 
> AvroInputFormat<>(iPath, GenericRecord.class));
> dataSet.map(new MapFunction>() {
> @Override
> public Tuple2 map(GenericRecord record) {
> Integer id = (Integer) record.get("id");
> String userAgent = (String) record.get("user_agent");
> return new Tuple2<>(id, userAgent);
> }
> }).writeAsText(args[1]);
>
> env.execute();
>
> But I got an exception-
>
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
> at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
> at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
> at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
> at
> org.apache.avro.reflect.ReflectDatumReader.(ReflectDatumReader.java:50)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
> at
> org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> By looking at StackTrace, I get that AvroInputFormat tries to read Avro
> file as SpecificRecords. Is there a way to read Avro file as GenericRecords?
>
>
> Thanks,
> Tarandeep
>



-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* *


Re: Example - Reading Avro Generic records

2016-04-02 Thread Tarandeep Singh
Thank you Gna for opening the ticket.

I looked into AvroInputFormat code and inspired by it I wrote a
GenericAvroInputFormat. The code is awfully similar (and hence redundant)
to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
in flink to support GenericRecord.

Anyways, I am pasting the code here for anyone who wants to use it (till
your code is part of Flink stable release)-

import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.flink.api.avro.FSDataInputStreamWrapper;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;

public class GenericAvroInputFormat extends
FileInputFormat implements
ResultTypeQueryable {

private transient long end;
private transient Schema schema;
private transient FileReader fileReader;
private boolean reuseAvroValue = true;

private static final long serialVersionUID = 1L;

public GenericAvroInputFormat(Path filePath, Schema schema) {
super(filePath);
this.schema = schema;
}

public void setReuseAvroValue(boolean reuseAvroValue) {
this.reuseAvroValue = reuseAvroValue;
}

public void setUnsplittable(boolean unsplittable) {
this.unsplittable = unsplittable;
}

@Override
public TypeInformation getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
SeekableInput sin = new FSDataInputStreamWrapper(stream,
split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DatumReader reader = new GenericDatumReader<>();
fileReader = DataFileReader.openReader(sin, reader);
fileReader.sync(split.getStart());
this.end = split.getStart() + split.getLength();
}

@Override
public boolean reachedEnd() throws IOException {
return !fileReader.hasNext() || fileReader.pastSync(end);
}

@Override
public GenericRecord nextRecord(GenericRecord reuseValue) throws
IOException {
if (reachedEnd()) {
return null;
}

if (!reuseAvroValue) {
reuseValue =
InstantiationUtil.instantiate(GenericRecord.class, Object.class);
}

reuseValue = fileReader.next(reuseValue);
return reuseValue;
}
}


Usage:

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
final Path inPath = new Path(args[0]);

Schema schema = new Schema.Parser().parse(new
File("/path/to/schemafile.avsc"));
DataSet dataSet = env.createInput(new
GenericAvroInputFormat(inPath, schema));
dataSet.map(new MapFunction>() {
@Override
public Tuple2 map(GenericRecord record) {
Long id = (Long) record.get("id");
String someString = record.get("somestring").toString();
return new Tuple2<>(id, someString);
}
}).writeAsText(args[1]);

env.execute();
}


-Tarandeep







On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
gna.phetsar...@teamaol.com> wrote:

> Tarandeep,
>
> There isn't a way yet, but I am proposing to do one:
> https://issues.apache.org/jira/browse/FLINK-3691
>
> -Gna
>
> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh 
> wrote:
>
>> Hi,
>>
>> Can someone please point me to an example of creating DataSet using Avro
>> Generic Records?
>>
>> I tried this code -
>>
>> final ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>> final Path iPath = new Path(args[0]);
>>
>> DataSet dataSet = env.createInput(new 
>> AvroInputFormat<>(iPath, GenericRecord.class));
>> dataSet.map(new MapFunction>() {
>> @Override
>> public Tuple2 map(GenericRecord record) {
>> Integer id = (Integer) record.get("id");
>> String userAgent = (String) record.get("user_agent");
>> return new Tuple2<>(id, userAgent);
>> }
>> }).writeAsText(args[1]);
>>
>> env.execute();
>>
>> But I got an exception-
>>
>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
>> interface org.apache.avro.generic.GenericRecord
>> at
>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
>> at
>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>>   

Re: Example - Reading Avro Generic records

2016-04-07 Thread Sourigna Phetsarath
Tranadeep,

Thanks for pasting your code!

I have a PR ready that extends AvroInputFormat and will submit it soon.

Still waiting for the legal team at AOL to approve it.

-Gna

On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh  wrote:

> Thank you Gna for opening the ticket.
>
> I looked into AvroInputFormat code and inspired by it I wrote a
> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
> in flink to support GenericRecord.
>
> Anyways, I am pasting the code here for anyone who wants to use it (till
> your code is part of Flink stable release)-
>
> import java.io.IOException;
>
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.FileReader;
> import org.apache.avro.file.SeekableInput;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumReader;
> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
> import org.apache.flink.api.common.io.FileInputFormat;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> import org.apache.flink.core.fs.FileInputSplit;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.util.InstantiationUtil;
>
> public class GenericAvroInputFormat extends FileInputFormat 
> implements ResultTypeQueryable {
>
> private transient long end;
> private transient Schema schema;
> private transient FileReader fileReader;
> private boolean reuseAvroValue = true;
>
> private static final long serialVersionUID = 1L;
>
> public GenericAvroInputFormat(Path filePath, Schema schema) {
> super(filePath);
> this.schema = schema;
> }
>
> public void setReuseAvroValue(boolean reuseAvroValue) {
> this.reuseAvroValue = reuseAvroValue;
> }
>
> public void setUnsplittable(boolean unsplittable) {
> this.unsplittable = unsplittable;
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(GenericRecord.class);
> }
>
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> SeekableInput sin = new FSDataInputStreamWrapper(stream, 
> split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
> DatumReader reader = new GenericDatumReader<>();
> fileReader = DataFileReader.openReader(sin, reader);
> fileReader.sync(split.getStart());
> this.end = split.getStart() + split.getLength();
> }
>
> @Override
> public boolean reachedEnd() throws IOException {
> return !fileReader.hasNext() || fileReader.pastSync(end);
> }
>
> @Override
> public GenericRecord nextRecord(GenericRecord reuseValue) throws 
> IOException {
> if (reachedEnd()) {
> return null;
> }
>
> if (!reuseAvroValue) {
> reuseValue = InstantiationUtil.instantiate(GenericRecord.class, 
> Object.class);
> }
>
> reuseValue = fileReader.next(reuseValue);
> return reuseValue;
> }
> }
>
>
> Usage:
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> final Path inPath = new Path(args[0]);
>
> Schema schema = new Schema.Parser().parse(new 
> File("/path/to/schemafile.avsc"));
> DataSet dataSet = env.createInput(new 
> GenericAvroInputFormat(inPath, schema));
> dataSet.map(new MapFunction>() {
> @Override
> public Tuple2 map(GenericRecord record) {
> Long id = (Long) record.get("id");
> String someString = record.get("somestring").toString();
> return new Tuple2<>(id, someString);
> }
> }).writeAsText(args[1]);
>
> env.execute();
> }
>
>
> -Tarandeep
>
>
>
>
>
>
>
> On Fri, Apr 1, 2016 at 3:40 PM, Sourigna Phetsarath <
> gna.phetsar...@teamaol.com> wrote:
>
>> Tarandeep,
>>
>> There isn't a way yet, but I am proposing to do one:
>> https://issues.apache.org/jira/browse/FLINK-3691
>>
>> -Gna
>>
>> On Fri, Apr 1, 2016 at 4:04 AM, Tarandeep Singh 
>> wrote:
>>
>>> Hi,
>>>
>>> Can someone please point me to an example of creating DataSet using Avro
>>> Generic Records?
>>>
>>> I tried this code -
>>>
>>> final ExecutionEnvironment env = 
>>> ExecutionEnvironment.getExecutionEnvironment();
>>> final Path iPath = new Path(args[0]);
>>>
>>> DataSet dataSet = env.createInput(new 
>>> AvroInputFormat<>(iPath, GenericRecord.class));
>>> dataSet.map(new MapFunction>() {
>>> @Override
>>> public Tuple2 map(GenericRecord record) {
>>> Integer id = (Integer) record.get("id");
>>> String userAgent = (

Re: Example - Reading Avro Generic records

2016-04-07 Thread Sourigna Phetsarath
Tranadeep,

Also, in your code example, when *reuseAvroValue* is *false* the code will
fail with this message:

java.lang.RuntimeException: The class
'org.apache.avro.generic.GenericRecord' is not instantiable: The class is
no proper class, it is either abstract, an interface, or a primitive type.
at
org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:222)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:147)
at
org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:122)
at



I had encountered this when I was write the PR.

-Gna

On Thu, Apr 7, 2016 at 11:08 AM, Sourigna Phetsarath <
gna.phetsar...@teamaol.com> wrote:

> Tranadeep,
>
> Thanks for pasting your code!
>
> I have a PR ready that extends AvroInputFormat and will submit it soon.
>
> Still waiting for the legal team at AOL to approve it.
>
> -Gna
>
> On Sat, Apr 2, 2016 at 5:36 PM, Tarandeep Singh 
> wrote:
>
>> Thank you Gna for opening the ticket.
>>
>> I looked into AvroInputFormat code and inspired by it I wrote a
>> GenericAvroInputFormat. The code is awfully similar (and hence redundant)
>> to original AvroInputFormat, so it is a good idea to modify AvroInputFormat
>> in flink to support GenericRecord.
>>
>> Anyways, I am pasting the code here for anyone who wants to use it (till
>> your code is part of Flink stable release)-
>>
>> import java.io.IOException;
>>
>> import org.apache.avro.Schema;
>> import org.apache.avro.file.DataFileReader;
>> import org.apache.avro.file.FileReader;
>> import org.apache.avro.file.SeekableInput;
>> import org.apache.avro.generic.GenericDatumReader;
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.avro.io.DatumReader;
>> import org.apache.flink.api.avro.FSDataInputStreamWrapper;
>> import org.apache.flink.api.common.io.FileInputFormat;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.TypeExtractor;
>> import org.apache.flink.core.fs.FileInputSplit;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.util.InstantiationUtil;
>>
>> public class GenericAvroInputFormat extends FileInputFormat 
>> implements ResultTypeQueryable {
>>
>> private transient long end;
>> private transient Schema schema;
>> private transient FileReader fileReader;
>> private boolean reuseAvroValue = true;
>>
>> private static final long serialVersionUID = 1L;
>>
>> public GenericAvroInputFormat(Path filePath, Schema schema) {
>> super(filePath);
>> this.schema = schema;
>> }
>>
>> public void setReuseAvroValue(boolean reuseAvroValue) {
>> this.reuseAvroValue = reuseAvroValue;
>> }
>>
>> public void setUnsplittable(boolean unsplittable) {
>> this.unsplittable = unsplittable;
>> }
>>
>> @Override
>> public TypeInformation getProducedType() {
>> return TypeExtractor.getForClass(GenericRecord.class);
>> }
>>
>> @Override
>> public void open(FileInputSplit split) throws IOException {
>> super.open(split);
>> SeekableInput sin = new FSDataInputStreamWrapper(stream, 
>> split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
>> DatumReader reader = new GenericDatumReader<>();
>> fileReader = DataFileReader.openReader(sin, reader);
>> fileReader.sync(split.getStart());
>> this.end = split.getStart() + split.getLength();
>> }
>>
>> @Override
>> public boolean reachedEnd() throws IOException {
>> return !fileReader.hasNext() || fileReader.pastSync(end);
>> }
>>
>> @Override
>> public GenericRecord nextRecord(GenericRecord reuseValue) throws 
>> IOException {
>> if (reachedEnd()) {
>> return null;
>> }
>>
>> if (!reuseAvroValue) {
>> reuseValue = InstantiationUtil.instantiate(GenericRecord.class, 
>> Object.class);
>> }
>>
>> reuseValue = fileReader.next(reuseValue);
>> return reuseValue;
>> }
>> }
>>
>>
>> Usage:
>>
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = 
>> ExecutionEnvironment.getExecutionEnvironment();
>> final Path inPath = new Path(args[0]);
>>
>> Schema schema = new Schema.Parser().parse(new 
>> File("/path/to/schemafile.avsc"));
>> DataSet dataSet = env.createInput(new 
>> GenericAvroInputFormat(inPath, schema));
>> dataSet.map(new MapFunction>() {
>> @Override
>> public Tuple2 map(GenericRecord record) {
>> Long id = (Long) record.get("id");
>> String someString = record.get("somestring").toString();
>> return new Tuple2<>(id, someString);
>> }
>> }).writeAsText(args[1]);
>>
>> env.execute();
>> }
>>
>>
>> -Tarandeep
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 1, 2016 at 3:40 PM, Sourign