Hi Dan, Thanks for sharing! Glad you got it working =)
I'm sure this will be very helpful for future flume HDFS avro users. Best, Ed On Sat, Feb 8, 2014 at 10:46 AM, Daniel Rodriguez <[email protected] > wrote: > Ok, after some help in the Avro mailing list I came up with a solution. > > I was in the right path, with the avro schema in the flume header field > and the binary data from the values in the body field is possible to > reconstruct the original avro file. So now I don't need to define the > schema in the class that extends AbstractAvroEventSerializer, it will work > for any Avro file, which is what I wanted. > > The serializer I wrote is a little bit hacky and dirty since there are > some private attributes (dataFileWriter) in the AbstractAvroSerializer that > I need to use when I overwrite the write method. So I am extending > EventSerializer and I copied some code form the AbstractAvroEventSerializer. > > I am sure it can be done better but the solution is working. Code is below. > > Thanks, > Daniel > > > package org.apache.flume.serialization; > > import static > org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.COMPRESSION_CODEC; > import static > org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_COMPRESSION_CODEC; > import static > org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.DEFAULT_SYNC_INTERVAL_BYTES; > import static > org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.SYNC_INTERVAL_BYTES; > > import org.apache.flume.serialization.AbstractAvroEventSerializer; > > import java.io.IOException; > import java.io.OutputStream; > import java.nio.ByteBuffer; > import java.util.Map; > > import org.apache.avro.AvroRuntimeException; > import org.apache.avro.Schema; > import org.apache.avro.file.CodecFactory; > import org.apache.avro.file.DataFileWriter; > import org.apache.avro.io.DatumWriter; > import org.apache.avro.reflect.ReflectDatumWriter; > import org.apache.avro.util.Utf8; > import org.apache.flume.Context; > import org.apache.flume.Event; > import org.apache.flume.serialization.EventSerializer; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > public class AvroEventSerializer implements EventSerializer { > private static final Logger logger = LoggerFactory > .getLogger(AvroEventSerializer.class); > > public Context context; > > public Map headers; > public Schema schema; > > private DatumWriter writer = null; > private DataFileWriter dataFileWriter = null; > > private final OutputStream out; > > private AvroEventSerializer(OutputStream out) { > this.out = out; > } > > public void configure(Context context) { > if (headers == null) { > // schema comes from event and is not ready > return; > } else { > if (dataFileWriter != null) { > // Schema is ok but dataFileWriter was already created > return; > } > } > // From this point the code should run only once, after receiving the > // schema, this way the dataFileWriter is only created once > > writer = new ReflectDatumWriter(getSchema()); > dataFileWriter = new DataFileWriter(writer); > > int syncIntervalBytes = context.getInteger(SYNC_INTERVAL_BYTES, > DEFAULT_SYNC_INTERVAL_BYTES); > dataFileWriter.setSyncInterval(syncIntervalBytes); > > String compressionCodec = context.getString(COMPRESSION_CODEC, > DEFAULT_COMPRESSION_CODEC); > try { > CodecFactory codecFactory = CodecFactory > .fromString(compressionCodec); > dataFileWriter.setCodec(codecFactory); > } catch (AvroRuntimeException e) { > logger.warn("Unable to instantiate avro codec with name (" > + compressionCodec > + "). Compression disabled. Exception follows.", e); > } > } > > public Schema getSchema() { > if (schema == null) { > String schemaString; > if (headers.keySet().toArray()[0].getClass() == String.class) { > String schemaHeaderKey = "flume.avro.schema.literal"; > schemaString = headers.get(schemaHeaderKey).toString(); > } else { > Utf8 schemaHeaderKey = new Utf8("flume.avro.schema.literal"); > schemaString = headers.get(schemaHeaderKey).toString(); > } > Schema.Parser parser = new Schema.Parser(); > schema = parser.parse(schemaString); > } > return schema; > } > > public OutputStream getOutputStream() { > return out; > } > > public Event convert(Event event) { > headers = event.getHeaders(); > this.configure(context); > return event; > } > > public void write(Event event) throws IOException { > convert(event); > ByteBuffer buf = ByteBuffer.wrap(event.getBody()); > // Need to reconfigure since schema is not defined the first time > // configure() runs > this.configure(context); > afterCreate(); > // end > dataFileWriter.appendEncoded(buf); > // Uncomment below for running the test locally, comment for mvn build > // dataFileWriter.close(); > } > > public boolean afterCreateRun = false; > > public void afterCreate() throws IOException { > if (afterCreateRun == true) { > return; > } > > if (dataFileWriter != null) { > dataFileWriter.create(getSchema(), getOutputStream()); > afterCreateRun = true; > } > } > > public void afterReopen() throws IOException { > throw new UnsupportedOperationException( > "Avro API doesn't support append"); > } > > public void flush() throws IOException { > if (dataFileWriter != null) { > dataFileWriter.flush(); > } > } > > public void beforeClose() throws IOException { > // no-op > } > > public boolean supportsReopen() { > return false; > } > > public static class Builder implements EventSerializer.Builder { > > public EventSerializer build(Context context, OutputStream out) { > AvroEventSerializer writer = new AvroEventSerializer(out); > writer.configure(context); > writer.context = context; > return writer; > } > > } > } > > > > > > > > On Feb 7, 2014, at 9:19 AM, Daniel Rodriguez <[email protected]> > wrote: > > Hi ed, > > Thanks for your response. I was afraid that the solution was to write my > own serializer, not the most expert Java programmer :P > > But I think that is the only solution, reading more at the docs: > > *This deserializer is able to read an Avro container file, and it > generates one event per Avro record in the file. Each event is annotated > with a header that indicates the schema used. The body of the event is the > binary Avro record data, not including the schema or the rest of the > container file elements.* > > So I tested using deserializer.schemaType = LITERAL and I can see a JSON > header with the schema and on the body i can see the binary data of the > values. So I think it should be "easy" to write a serializer based on an > example I found: > https://github.com/brockn/avro-flume-hive-example/blob/master/src/main/java/com/cloudera/flume/serialization/FlumeEventStringBodyAvroEventSerializer.java > > I was hoping that a General Avro serializer existed since there a > deserializer that I am using in the SpoolDir Source. > > I will post if I came up with a solution, > > Thanks > > On Feb 6, 2014, at 9:10 PM, ed <[email protected]> wrote: > > Hi Daniel, > > I think you will need to write a custom event serializer for the HDFSSink > that extends AbstractAvroEventSerializer to write out your data using your > specific Avro Schema. Then in your agent configuration add it like this: > > a1.sinks.sink1.serializer = >> com.yourpackagename.CustomAvroEventSerializer$Builder > > > As a quick test you can use the default avro serializer ( > https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer) like > so: > > a1.sinks.sink1.serializer = avro_event > > > I think this will end up just wrapping your avro data in Flume's default > schema but at least you can see if valid avro files are getting written to > HDFS. Hope that gets you a little closer. > > Best, > > Ed > > > On Fri, Feb 7, 2014 at 11:51 AM, Daniel Rodriguez < > [email protected]> wrote: > >> Hi all, >> >> I have users writing AVRO files in different server and I want to use >> Flume to move all those files into HDFS using Flume. So I can later use >> Hive or Pig to query/analyse the data. >> >> On the client I installed flume and have a SpoolDir source and AVRO sink >> like this: >> >> a1.sources = src1 >> a1.sinks = sink1 >> a1.channels = c1 >> >> a1.channels.c1.type = memory >> >> a1.sources.src1.type = spooldir >> a1.sources.src1.channels = c1 >> a1.sources.src1.spoolDir = {directory} >> a1.sources.src1.fileHeader = true >> a1.sources.src1.deserializer = avro >> >> a1.sinks.sink1.type = avro >> a1.sinks.sink1.channel = c1 >> a1.sinks.sink1.hostname = {IP} >> a1.sinks.sink1.port = 41414 >> >> On the hadoop cluster I have this AVRO source and HDFS sink: >> >> a1.sources = avro1 >> a1.sinks = sink1 >> a1.channels = c1 >> >> a1.channels.c1.type = memory >> >> a1.sources.avro1.type = avro >> a1.sources.avro1.channels = c1 >> a1.sources.avro1.bind = 0.0.0.0 >> a1.sources.avro1.port = 41414 >> >> a1.sinks.sink1.type = hdfs >> a1.sinks.sink1.channel = c1 >> a1.sinks.sink1.hdfs.path = {hdfs dir} >> a1.sinks.sink1.hdfs.fileSuffix = .avro >> a1.sinks.sink1.hdfs.rollSize = 67108864 >> a1.sinks.sink1.hdfs.fileType = DataStream >> >> The problem is that the files on HDFS are not valid AVRO files! I am >> using the hue UI to check whenever the file is a valid AVRO file or not. If >> I upload an AVRO I file that I generate on my pc to the cluster I can see >> its contents perfectly, even create a Hive table and query but the files I >> send via flume are not valid AVRO files. >> >> I tried the flume avro client that is included in flume but didn't work >> because it sends a flume event per line breaking the avro files, so i fixed >> that using the spooldir source using deserializer = avro. So I think the >> problem is on the HDFS sink when is writing the files. >> >> Using hdfs.fileType = DataStream it writes the values from the avro >> fields not the whole avro file, losing all the schema information. If I use >> hdfs.fileType >> = SequenceFile the files are not valid for some reason. >> >> I appreciate any help. >> >> Thanks, >> >> Daniel >> > > > >
