Dear Sunita, Thank you for your fast answer.
It's not exactly what i'm expected. I am using apache avro C++ and i would like to deserialize an .avro file. Just with one .avro file and Avro C++ functions i'm trying to extract the schematic, the keys on the schema and the data at the end of the file. ("avroProgram2.avro"). I saw different functions: for example *http://avro.apache.org/docs/1.7.6/api/cpp/html/ <http://avro.apache.org/docs/1.7.6/api/cpp/html/>* in the file:* resolving.cc*, the program use load(...) to import schema and use it after in the avro::resolvingDecoder. In my case, i can't import this schema for deserialization: i just have the .avro file and i'm searching function to store information from this file avro file (extract the schema, the keys and values). I need these information because i have to create new object after in Matlab - using Mex functions (in C++). Thanks you for your time, Nicolas Ranc 2016-07-27 19:10 GMT+02:00 Sunita Arvind <sunitarv...@gmail.com>: > For benefit of anyone else hitting the same issue, here is what I found: > > The serializer I was using was extending AbstractAvroEventSerializer. This > class has a lot of adoption, so its not likely to be an issue in the > abstract class. However, I got rid of this issue by overriding the > configure method in AbstractAvroEventSerializer in my custom serializer, as > below: > > > public void configure(Context context) { > int syncIntervalBytes = context.getInteger("syncIntervalBytes", > Integer.valueOf(2048000)).intValue(); > String compressionCodec = context.getString("compressionCodec", "null"); > this.writer = new ReflectDatumWriter(this.getSchema()); > this.dataFileWriter = new DataFileWriter(this.writer); > this.dataFileWriter.setSyncInterval(syncIntervalBytes); > try { > CodecFactory e = CodecFactory.fromString(compressionCodec); > this.dataFileWriter.setCodec(e); > * this.dataFileWriter.create(schema,out); --> added the creation * > } catch (AvroRuntimeException var5) { > logger.warn("Unable to instantiate avro codec with name (" + > compressionCodec + "). Compression disabled. Exception follows.", var5); > } catch (IOException io){ > logger.warn("Could not open dataFileWriter Exception follows.", > io.getStackTrace()); > } > > } > > After this, the files are getting created in hdfs just right. > I was also able to view the files in spark using spark-avro package. > Hope this is the right way to do it and the solution helps someone. > Would love to hear if anyone in avro or flume community knows of a better way > to do it. > > regards > Sunita > > > On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <sunitarv...@gmail.com> > wrote: > >> Hello Experts, >> >> I am trying to convert a custom data source received in flume into avro >> and push to hdfs. What I am attempting to do is >> syslog -> flume -> flume interceptor to convert into >> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back >> to Avro >> >> The flume configuration looks like: >> >> tier1.sources.syslogsource.interceptors.i2.type=timestamp >> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true >> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1 >> tier1.sources.syslogsource.interceptors.i1.type = >> com.flume.CustomToAvroConvertInterceptor$Builder >> >> #hdfs sink for archival and batch analysis >> tier1.sinks.hdfssink.type = hdfs >> tier1.sinks.hdfssink.hdfs.writeFormat = Text >> tier1.sinks.hdfssink.hdfs.fileType = DataStream >> >> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H >> tier1.sinks.hdfssink.hdfs.inUsePrefix=_ >> >> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H >> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro >> # roll file if it's been 10 * 60 seconds = 600 >> tier1.sinks.hdfssink.hdfs.rollInterval=600 >> # roll file if we get 50,000 log lines (~25MB) >> tier1.sinks.hdfssink.hdfs.rollCount=0 >> tier1.sinks.hdfssink.hdfs.batchSize = 100 >> tier1.sinks.hdfssink.hdfs.rollSize=0 >> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder >> tier1.sinks.hdfssink.serializer.compressionCodec=snappy >> tier1.sinks.hdfssink.channel = hdfsmem >> >> When I use tier1.sinks.hdfssink.serializer=avro_event >> I get binary data stored into hdfs which is the >> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray , >> however this data cannot be parsed in hive. As a result, I see all nulls in >> the column values. >> Based on - >> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray >> ? >> all I am doing in RawAvroHiveSerializer.convert is to decode using binary >> Decoder. >> The exception I get seems to be unrelated to the code itself, hence >> pasting the stack trace. Will share the code if it is required to identify >> the rootcause: >> >> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to >> deliver event. Exception follows. >> org.apache.flume.EventDeliveryException: >> org.apache.avro.AvroRuntimeException: not open >> at >> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463) >> at >> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) >> at >> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: org.apache.avro.AvroRuntimeException: not open >> at >> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82) >> at >> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299) >> at >> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108) >> at >> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124) >> at >> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550) >> at >> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547) >> at >> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679) >> at >> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50) >> >> I can reproduce this local file system as well. In the testcase, I tried >> setting the file open to append=true and still encounter the same exception. >> >> Appreciate any guidance in this regard. >> >> regards >> Sunita >> > >
avroProgram2.avro
Description: Binary data