Cornell's solution works. Just to avoid confusion, the body ehich is a bye array is what your read from kafka not the flume event body. Name or something like message to avoid confusion
On Friday, November 6, 2015, Gonzalo Herreros <[email protected]> wrote: > Hola Guillermo, > > If I understand correctly you want Flume to write to kafka as a channel > and then Spark to read from kafka. > To do that you have two options: > - Make Spark deserialize the FlumeEvent read from kafka. for instance in > scala: > > val parseFlumeEvent = { body: Array[Byte] => > val reader = new > SpecificDatumReader[AvroFlumeEvent](AvroFlumeEvent.getClassSchema) > val in = new ByteArrayInputStream(body); > val decoder = > org.apache.avro.io.DecoderFactory.get().directBinaryDecoder(in, null); > val event = reader.read(null, decoder); > new String(event.getBody().array()) > } > > -Or set parseAsFlumeEvent=false and merge fix FLUME-2781 (or wait for > Flume 1.7.0 to be released). > Without the fix, flume will still serialize avro kafka regardless of the > property. > > Alternatively, you can write to kafka as a sink instead of a channel, so > it doesn't wrap the message. But that adds some overhead because now you > need a channel. > > Saludos, > Gonzalo > > > On 6 November 2015 at 08:38, Guillermo Ortiz <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: > >> Hello, >> >> I'm using the KafkaChannel, I have a doubt about the documentation. >> >> Expecting Avro datums with FlumeEvent schema in the channel. This >> should be true if Flume source is writing to the channel And false if >> other producers are writing into the topic that the channel is using >> Flume source messages to Kafka can be parsed outside of Flume by using >> org.apache.flume.source.avro.AvroFlumeEvent provided by the >> flume-ng-sdk artifact >> >> In my PoC, I'm writing just with Kafka to my topic but I want to read >> from the KafkaChannel with Spark. I have configured parseAsFlumeEvent >> as true because just Flume writes to this topic. >> What should I do to read the "events" from Kafka? DO I have to use >> org.apache.flume.source.avro.AvroFlumeEvent??. If I set the parameter >> as false, how are the events inserted? >> > > -- Thanks, Hari
