Dear Sunita,

Thank you for your fast answer.

It's not exactly what i'm expected.
I am using apache avro C++ and i would like to deserialize an .avro file.
Just with one .avro file and Avro C++ functions i'm trying to extract the
schematic, the keys on the schema and the data at the end of the file.

I saw different functions: for example
<>* in the file:**, the program use load(...) to import schema and use it after
in the avro::resolvingDecoder.
In my case, i can't import this schema for deserialization: i just have the
.avro file and i'm searching function to store information from this file
avro file (extract the schema, the keys and values). I need these
information because i have to create new object after in Matlab - using Mex
functions (in C++).

Thanks you for your time,
Nicolas Ranc

2016-07-27 19:10 GMT+02:00 Sunita Arvind <>:

> For benefit of anyone else hitting the same issue, here is what I found:
> The serializer I was using was extending AbstractAvroEventSerializer. This
> class has a lot of adoption, so its not likely to be an issue in the
> abstract class. However, I got rid of this issue by overriding the
> configure method in AbstractAvroEventSerializer in my custom serializer, as
> below:
> public void configure(Context context) {
>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", 
> Integer.valueOf(2048000)).intValue();
>     String compressionCodec = context.getString("compressionCodec", "null");
>     this.writer = new ReflectDatumWriter(this.getSchema());
>     this.dataFileWriter = new DataFileWriter(this.writer);
>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>     try {
>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>         this.dataFileWriter.setCodec(e);
>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>     } catch (AvroRuntimeException var5) {
>         logger.warn("Unable to instantiate avro codec with name (" + 
> compressionCodec + "). Compression disabled. Exception follows.", var5);
>     } catch (IOException io){
>         logger.warn("Could not open dataFileWriter Exception follows.", 
> io.getStackTrace());
>     }
> }
> After this, the files are getting created in hdfs just right.
> I was also able to view the files in spark using spark-avro package.
> Hope this is the right way to do it and the solution helps someone.
> Would love to hear if anyone in avro or flume community knows of a better way 
> to do it.
> regards
> Sunita
> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <>
> wrote:
>> Hello Experts,
>> I am trying to convert a custom data source received in flume into avro
>> and push to hdfs. What I am attempting to do is
>> syslog -> flume -> flume interceptor to convert into
>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>> to Avro
>> The flume configuration looks like:
>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>> tier1.sources.syslogsource.interceptors.i1.type =
>> com.flume.CustomToAvroConvertInterceptor$Builder
>> #hdfs sink for archival and batch analysis
>> tier1.sinks.hdfssink.type = hdfs
>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>> # roll file if it's been 10 * 60 seconds = 600
>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>> # roll file if we get 50,000 log lines (~25MB)
>> tier1.sinks.hdfssink.hdfs.rollCount=0
>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>> tier1.sinks.hdfssink.hdfs.rollSize=0
>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>> = hdfsmem
>> When I use tier1.sinks.hdfssink.serializer=avro_event
>> I get binary data stored into hdfs which is the
>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>> however this data cannot be parsed in hive. As a result, I see all nulls in
>> the column values.
>> Based on -
>> ?
>> all I am doing in RawAvroHiveSerializer.convert is to decode using binary
>> Decoder.
>> The exception I get seems to be unrelated to the code itself, hence
>> pasting the stack trace. Will share the code if it is required to identify
>> the rootcause:
>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.
>> org.apache.flume.EventDeliveryException:
>> org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(
>>         at
>> org.apache.flume.SinkRunner$
>>         at
>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.avro.file.DataFileWriter.assertOpen(
>>         at
>> org.apache.avro.file.DataFileWriter.append(
>>         at
>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(
>>         at
>> org.apache.flume.sink.hdfs.HDFSDataStream.append(
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$9$
>>         at
>> org.apache.flume.auth.SimpleAuthenticator.execute(
>> I can reproduce this local file system as well. In the testcase, I tried
>> setting the file open to append=true and still encounter the same exception.
>> Appreciate any guidance in this regard.
>> regards
>> Sunita

Attachment: avroProgram2.avro
Description: Binary data

Reply via email to