Ok, after some help in the Avro mailing list I came up with a solution.
I was in the right path, with the avro schema in the flume header field and the
binary data from the values in the body field is possible to reconstruct the
original avro file. So now I don’t need to define the schema in the class that
extends AbstractAvroEventSerializer, it will work for any Avro file, which is
what I wanted.
The serializer I wrote is a little bit hacky and dirty since there are some
private attributes (dataFileWriter) in the AbstractAvroSerializer that I need
to use when I overwrite the write method. So I am extending EventSerializer and
I copied some code form the AbstractAvroEventSerializer.
I am sure it can be done better but the solution is working. Code is below.
Thanks,
Daniel
package org.apache.flume.serialization;
import static
org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC;
import static
org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC;
import static
org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES;
import static
org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES;
import org.apache.flume.serialization.AbstractAvroEventSerializer;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.serialization.EventSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AvroEventSerializer implements EventSerializer {
private static final Logger logger = LoggerFactory
.getLogger(AvroEventSerializer.class);
public Context context;
public Map headers;
public Schema schema;
private DatumWriter writer = null;
private DataFileWriter dataFileWriter = null;
private final OutputStream out;
private AvroEventSerializer(OutputStream out) {
this.out = out;
}
public void configure(Context context) {
if (headers == null) {
// schema comes from event and is not ready
return;
} else {
if (dataFileWriter != null) {
// Schema is ok but dataFileWriter was already
created
return;
}
}
// From this point the code should run only once, after
receiving the
// schema, this way the dataFileWriter is only created once
writer = new ReflectDatumWriter(getSchema());
dataFileWriter = new DataFileWriter(writer);
int syncIntervalBytes = context.getInteger(SYNC_INTERVAL_BYTES,
DEFAULT_SYNC_INTERVAL_BYTES);
dataFileWriter.setSyncInterval(syncIntervalBytes);
String compressionCodec = context.getString(COMPRESSION_CODEC,
DEFAULT_COMPRESSION_CODEC);
try {
CodecFactory codecFactory = CodecFactory
.fromString(compressionCodec);
dataFileWriter.setCodec(codecFactory);
} catch (AvroRuntimeException e) {
logger.warn("Unable to instantiate avro codec with name
("
+ compressionCodec
+ "). Compression disabled. Exception
follows.", e);
}
}
public Schema getSchema() {
if (schema == null) {
String schemaString;
if (headers.keySet().toArray()[0].getClass() ==
String.class) {
String schemaHeaderKey =
"flume.avro.schema.literal";
schemaString =
headers.get(schemaHeaderKey).toString();
} else {
Utf8 schemaHeaderKey = new
Utf8("flume.avro.schema.literal");
schemaString =
headers.get(schemaHeaderKey).toString();
}
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(schemaString);
}
return schema;
}
public OutputStream getOutputStream() {
return out;
}
public Event convert(Event event) {
headers = event.getHeaders();
this.configure(context);
return event;
}
public void write(Event event) throws IOException {
convert(event);
ByteBuffer buf = ByteBuffer.wrap(event.getBody());
// Need to reconfigure since schema is not defined the first
time
// configure() runs
this.configure(context);
afterCreate();
// end
dataFileWriter.appendEncoded(buf);
// Uncomment below for running the test locally, comment for
mvn build
// dataFileWriter.close();
}
public boolean afterCreateRun = false;
public void afterCreate() throws IOException {
if (afterCreateRun == true) {
return;
}
if (dataFileWriter != null) {
dataFileWriter.create(getSchema(), getOutputStream());
afterCreateRun = true;
}
}
public void afterReopen() throws IOException {
throw new UnsupportedOperationException(
"Avro API doesn't support append");
}
public void flush() throws IOException {
if (dataFileWriter != null) {
dataFileWriter.flush();
}
}
public void beforeClose() throws IOException {
// no-op
}
public boolean supportsReopen() {
return false;
}
public static class Builder implements EventSerializer.Builder {
public EventSerializer build(Context context, OutputStream out)
{
AvroEventSerializer writer = new
AvroEventSerializer(out);
writer.configure(context);
writer.context = context;
return writer;
}
}
}
On Feb 7, 2014, at 9:19 AM, Daniel Rodriguez <[email protected]> wrote:
> Hi ed,
>
> Thanks for your response. I was afraid that the solution was to write my own
> serializer, not the most expert Java programmer :P
>
> But I think that is the only solution, reading more at the docs:
>
> This deserializer is able to read an Avro container file, and it generates
> one event per Avro record in the file. Each event is annotated with a header
> that indicates the schema used. The body of the event is the binary Avro
> record data, not including the schema or the rest of the container file
> elements.
>
> So I tested using deserializer.schemaType = LITERAL and I can see a JSON
> header with the schema and on the body i can see the binary data of the
> values. So I think it should be “easy” to write a serializer based on an
> example I found:
> https://github.com/brockn/avro-flume-hive-example/blob/master/src/main/java/com/cloudera/flume/serialization/FlumeEventStringBodyAvroEventSerializer.java
>
> I was hoping that a General Avro serializer existed since there a
> deserializer that I am using in the SpoolDir Source.
>
> I will post if I came up with a solution,
>
> Thanks
>
> On Feb 6, 2014, at 9:10 PM, ed <[email protected]> wrote:
>
>> Hi Daniel,
>>
>> I think you will need to write a custom event serializer for the HDFSSink
>> that extends AbstractAvroEventSerializer to write out your data using your
>> specific Avro Schema. Then in your agent configuration add it like this:
>>
>> a1.sinks.sink1.serializer =
>> com.yourpackagename.CustomAvroEventSerializer$Builder
>>
>> As a quick test you can use the default avro serializer
>> (https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer) like so:
>>
>> a1.sinks.sink1.serializer = avro_event
>>
>> I think this will end up just wrapping your avro data in Flume's default
>> schema but at least you can see if valid avro files are getting written to
>> HDFS. Hope that gets you a little closer.
>>
>> Best,
>>
>> Ed
>>
>>
>> On Fri, Feb 7, 2014 at 11:51 AM, Daniel Rodriguez
>> <[email protected]> wrote:
>> Hi all,
>>
>> I have users writing AVRO files in different server and I want to use Flume
>> to move all those files into HDFS using Flume. So I can later use Hive or
>> Pig to query/analyse the data.
>>
>> On the client I installed flume and have a SpoolDir source and AVRO sink
>> like this:
>>
>> a1.sources = src1
>> a1.sinks = sink1
>> a1.channels = c1
>>
>> a1.channels.c1.type = memory
>>
>> a1.sources.src1.type = spooldir
>> a1.sources.src1.channels = c1
>> a1.sources.src1.spoolDir = {directory}
>> a1.sources.src1.fileHeader = true
>> a1.sources.src1.deserializer = avro
>>
>> a1.sinks.sink1.type = avro
>> a1.sinks.sink1.channel = c1
>> a1.sinks.sink1.hostname = {IP}
>> a1.sinks.sink1.port = 41414
>> On the hadoop cluster I have this AVRO source and HDFS sink:
>>
>> a1.sources = avro1
>> a1.sinks = sink1
>> a1.channels = c1
>>
>> a1.channels.c1.type = memory
>>
>> a1.sources.avro1.type = avro
>> a1.sources.avro1.channels = c1
>> a1.sources.avro1.bind = 0.0.0.0
>> a1.sources.avro1.port = 41414
>>
>> a1.sinks.sink1.type = hdfs
>> a1.sinks.sink1.channel = c1
>> a1.sinks.sink1.hdfs.path = {hdfs dir}
>> a1.sinks.sink1.hdfs.fileSuffix = .avro
>> a1.sinks.sink1.hdfs.rollSize = 67108864
>> a1.sinks.sink1.hdfs.fileType = DataStream
>> The problem is that the files on HDFS are not valid AVRO files! I am using
>> the hue UI to check whenever the file is a valid AVRO file or not. If I
>> upload an AVRO I file that I generate on my pc to the cluster I can see its
>> contents perfectly, even create a Hive table and query but the files I send
>> via flume are not valid AVRO files.
>>
>> I tried the flume avro client that is included in flume but didn't work
>> because it sends a flume event per line breaking the avro files, so i fixed
>> that using the spooldir source using deserializer = avro. So I think the
>> problem is on the HDFS sink when is writing the files.
>>
>> Using hdfs.fileType = DataStream it writes the values from the avro fields
>> not the whole avro file, losing all the schema information. If I use
>> hdfs.fileType = SequenceFile the files are not valid for some reason.
>>
>> I appreciate any help.
>>
>> Thanks,
>>
>> Daniel
>>
>>
>