Hi,

Can someone please point me to an example of creating DataSet using Avro
Generic Records?

I tried this code -

    final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final Path iPath = new Path(args[0]);

    DataSet<GenericRecord> dataSet = env.createInput(new
AvroInputFormat<>(iPath, GenericRecord.class));
    dataSet.map(new MapFunction<GenericRecord, Tuple2<Integer,String>>() {
        @Override
        public Tuple2<Integer,String> map(GenericRecord record) {
            Integer id = (Integer) record.get("id");
            String userAgent = (String) record.get("user_agent");
            return new Tuple2<>(id, userAgent);
        }
    }).writeAsText(args[1]);

    env.execute();

But I got an exception-

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
interface org.apache.avro.generic.GenericRecord
    at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:276)
    at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
    at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:217)
    at
org.apache.avro.reflect.ReflectDatumReader.<init>(ReflectDatumReader.java:50)
    at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:100)
    at
org.apache.flink.api.java.io.AvroInputFormat.open(AvroInputFormat.java:41)
    at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)

By looking at StackTrace, I get that AvroInputFormat tries to read Avro
file as SpecificRecords. Is there a way to read Avro file as GenericRecords?


Thanks,
Tarandeep

Reply via email to