Hi Dan,

Thanks for sharing!  Glad you got it working =)

I'm sure this will be very helpful for future flume HDFS avro users.

Best,

Ed


On Sat, Feb 8, 2014 at 10:46 AM, Daniel Rodriguez <[email protected]
> wrote:

> Ok, after some help in the Avro mailing list I came up with a solution.
>
> I was in the right path, with the avro schema in the flume header field
> and the binary data from the values in the body field is possible to
> reconstruct the original avro file. So now I don't need to define the
> schema in the class that extends AbstractAvroEventSerializer, it will work
> for any Avro file, which is what I wanted.
>
> The serializer I wrote is a little bit hacky and dirty since there are
> some private attributes (dataFileWriter) in the AbstractAvroSerializer that
> I need to use when I overwrite the write method. So I am extending
> EventSerializer and I copied some code form the AbstractAvroEventSerializer.
>
> I am sure it can be done better but the solution is working. Code is below.
>
> Thanks,
> Daniel
>
>
> package org.apache.flume.serialization;
>
> import static
> org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC;
> import static
> org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC;
> import static
> org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES;
> import static
> org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES;
>
> import org.apache.flume.serialization.AbstractAvroEventSerializer;
>
> import java.io.IOException;
> import java.io.OutputStream;
> import java.nio.ByteBuffer;
> import java.util.Map;
>
> import org.apache.avro.AvroRuntimeException;
> import org.apache.avro.Schema;
> import org.apache.avro.file.CodecFactory;
> import org.apache.avro.file.DataFileWriter;
> import org.apache.avro.io.DatumWriter;
> import org.apache.avro.reflect.ReflectDatumWriter;
> import org.apache.avro.util.Utf8;
> import org.apache.flume.Context;
> import org.apache.flume.Event;
> import org.apache.flume.serialization.EventSerializer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> public class AvroEventSerializer implements EventSerializer {
> private static final Logger logger = LoggerFactory
> .getLogger(AvroEventSerializer.class);
>
> public Context context;
>
> public Map headers;
> public Schema schema;
>
> private DatumWriter writer = null;
> private DataFileWriter dataFileWriter = null;
>
> private final OutputStream out;
>
> private AvroEventSerializer(OutputStream out) {
> this.out = out;
> }
>
> public void configure(Context context) {
> if (headers == null) {
> // schema comes from event and is not ready
> return;
> } else {
> if (dataFileWriter != null) {
> // Schema is ok but dataFileWriter was already created
> return;
> }
> }
> // From this point the code should run only once, after receiving the
> // schema, this way the dataFileWriter is only created once
>
> writer = new ReflectDatumWriter(getSchema());
> dataFileWriter = new DataFileWriter(writer);
>
> int syncIntervalBytes = context.getInteger(SYNC_INTERVAL_BYTES,
> DEFAULT_SYNC_INTERVAL_BYTES);
> dataFileWriter.setSyncInterval(syncIntervalBytes);
>
> String compressionCodec = context.getString(COMPRESSION_CODEC,
> DEFAULT_COMPRESSION_CODEC);
> try {
> CodecFactory codecFactory = CodecFactory
> .fromString(compressionCodec);
> dataFileWriter.setCodec(codecFactory);
> } catch (AvroRuntimeException e) {
> logger.warn("Unable to instantiate avro codec with name ("
> + compressionCodec
> + "). Compression disabled. Exception follows.", e);
> }
> }
>
> public Schema getSchema() {
> if (schema == null) {
> String schemaString;
> if (headers.keySet().toArray()[0].getClass() == String.class) {
> String schemaHeaderKey = "flume.avro.schema.literal";
> schemaString = headers.get(schemaHeaderKey).toString();
> } else {
> Utf8 schemaHeaderKey = new Utf8("flume.avro.schema.literal");
> schemaString = headers.get(schemaHeaderKey).toString();
> }
> Schema.Parser parser = new Schema.Parser();
> schema = parser.parse(schemaString);
> }
> return schema;
> }
>
> public OutputStream getOutputStream() {
> return out;
> }
>
> public Event convert(Event event) {
> headers = event.getHeaders();
> this.configure(context);
> return event;
> }
>
> public void write(Event event) throws IOException {
> convert(event);
> ByteBuffer buf = ByteBuffer.wrap(event.getBody());
> // Need to reconfigure since schema is not defined the first time
> // configure() runs
> this.configure(context);
> afterCreate();
> // end
> dataFileWriter.appendEncoded(buf);
> // Uncomment below for running the test locally, comment for mvn build
> // dataFileWriter.close();
> }
>
> public boolean afterCreateRun = false;
>
> public void afterCreate() throws IOException {
> if (afterCreateRun == true) {
> return;
> }
>
> if (dataFileWriter != null) {
> dataFileWriter.create(getSchema(), getOutputStream());
> afterCreateRun = true;
> }
> }
>
> public void afterReopen() throws IOException {
> throw new UnsupportedOperationException(
> "Avro API doesn't support append");
> }
>
> public void flush() throws IOException {
> if (dataFileWriter != null) {
> dataFileWriter.flush();
> }
> }
>
> public void beforeClose() throws IOException {
> // no-op
> }
>
> public boolean supportsReopen() {
> return false;
> }
>
> public static class Builder implements EventSerializer.Builder {
>
> public EventSerializer build(Context context, OutputStream out) {
> AvroEventSerializer writer = new AvroEventSerializer(out);
> writer.configure(context);
> writer.context = context;
> return writer;
> }
>
> }
> }
>
>
>
>
>
>
>
> On Feb 7, 2014, at 9:19 AM, Daniel Rodriguez <[email protected]>
> wrote:
>
> Hi ed,
>
> Thanks for your response. I was afraid that the solution was to write my
> own serializer, not the most expert Java programmer :P
>
> But I think that is the only solution, reading more at the docs:
>
> *This deserializer is able to read an Avro container file, and it
> generates one event per Avro record in the file. Each event is annotated
> with a header that indicates the schema used. The body of the event is the
> binary Avro record data, not including the schema or the rest of the
> container file elements.*
>
> So I tested using deserializer.schemaType = LITERAL and I can see a JSON
> header with the schema and on the body i can see the binary data of the
> values. So I think it should be "easy" to write a serializer based on an
> example I found:
> https://github.com/brockn/avro-flume-hive-example/blob/master/src/main/java/com/cloudera/flume/serialization/FlumeEventStringBodyAvroEventSerializer.java
>
> I was hoping that a General Avro serializer existed since there a
> deserializer that I am using in the SpoolDir Source.
>
> I will post if I came up with a solution,
>
> Thanks
>
> On Feb 6, 2014, at 9:10 PM, ed <[email protected]> wrote:
>
> Hi Daniel,
>
> I think you will need to write a custom event serializer for the HDFSSink
> that extends AbstractAvroEventSerializer to write out your data using your
> specific Avro Schema.  Then in your agent configuration add it like this:
>
> a1.sinks.sink1.serializer =
>> com.yourpackagename.CustomAvroEventSerializer$Builder
>
>
> As a quick test you can use the default avro serializer  (
> https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer) like
> so:
>
> a1.sinks.sink1.serializer = avro_event
>
>
> I think this will end up just wrapping your avro data in Flume's default
> schema but at least you can see if valid avro files are getting written to
> HDFS.  Hope that gets you a little closer.
>
> Best,
>
> Ed
>
>
> On Fri, Feb 7, 2014 at 11:51 AM, Daniel Rodriguez <
> [email protected]> wrote:
>
>> Hi all,
>>
>> I have users writing AVRO files in different server and I want to use
>> Flume to move all those files into HDFS using Flume. So I can later use
>> Hive or Pig to query/analyse the data.
>>
>> On the client I installed flume and have a SpoolDir source and AVRO sink
>> like this:
>>
>> a1.sources = src1
>> a1.sinks = sink1
>> a1.channels = c1
>>
>> a1.channels.c1.type = memory
>>
>> a1.sources.src1.type = spooldir
>> a1.sources.src1.channels = c1
>> a1.sources.src1.spoolDir = {directory}
>> a1.sources.src1.fileHeader = true
>> a1.sources.src1.deserializer = avro
>>
>> a1.sinks.sink1.type = avro
>> a1.sinks.sink1.channel = c1
>> a1.sinks.sink1.hostname = {IP}
>> a1.sinks.sink1.port = 41414
>>
>> On the hadoop cluster I have this AVRO source and HDFS sink:
>>
>> a1.sources = avro1
>> a1.sinks = sink1
>> a1.channels = c1
>>
>> a1.channels.c1.type = memory
>>
>> a1.sources.avro1.type = avro
>> a1.sources.avro1.channels = c1
>> a1.sources.avro1.bind = 0.0.0.0
>> a1.sources.avro1.port = 41414
>>
>> a1.sinks.sink1.type = hdfs
>> a1.sinks.sink1.channel = c1
>> a1.sinks.sink1.hdfs.path = {hdfs dir}
>> a1.sinks.sink1.hdfs.fileSuffix = .avro
>> a1.sinks.sink1.hdfs.rollSize = 67108864
>> a1.sinks.sink1.hdfs.fileType = DataStream
>>
>> The problem is that the files on HDFS are not valid AVRO files! I am
>> using the hue UI to check whenever the file is a valid AVRO file or not. If
>> I upload an AVRO I file that I generate on my pc to the cluster I can see
>> its contents perfectly, even create a Hive table and query but the files I
>> send via flume are not valid AVRO files.
>>
>> I tried the flume avro client that is included in flume but didn't work
>> because it sends a flume event per line breaking the avro files, so i fixed
>> that using the spooldir source using deserializer = avro. So I think the
>> problem is on the HDFS sink when is writing the files.
>>
>> Using hdfs.fileType = DataStream it writes the values from the avro
>> fields not the whole avro file, losing all the schema information. If I use 
>> hdfs.fileType
>> = SequenceFile the files are not valid for some reason.
>>
>> I appreciate any help.
>>
>> Thanks,
>>
>> Daniel
>>
>
>
>
>

Reply via email to