Thank you ;)
2015-11-06 9:57 GMT+01:00 Hari Shreedharan <[email protected]>: > Cornell's solution works. Just to avoid confusion, the body ehich is a bye > array is what your read from kafka not the flume event body. Name or > something like message to avoid confusion > > > On Friday, November 6, 2015, Gonzalo Herreros <[email protected]> wrote: >> >> Hola Guillermo, >> >> If I understand correctly you want Flume to write to kafka as a channel >> and then Spark to read from kafka. >> To do that you have two options: >> - Make Spark deserialize the FlumeEvent read from kafka. for instance in >> scala: >> >> val parseFlumeEvent = { body: Array[Byte] => >> val reader = new >> SpecificDatumReader[AvroFlumeEvent](AvroFlumeEvent.getClassSchema) >> val in = new ByteArrayInputStream(body); >> val decoder = >> org.apache.avro.io.DecoderFactory.get().directBinaryDecoder(in, null); >> val event = reader.read(null, decoder); >> new String(event.getBody().array()) >> } >> >> -Or set parseAsFlumeEvent=false and merge fix FLUME-2781 (or wait for >> Flume 1.7.0 to be released). >> Without the fix, flume will still serialize avro kafka regardless of the >> property. >> >> Alternatively, you can write to kafka as a sink instead of a channel, so >> it doesn't wrap the message. But that adds some overhead because now you >> need a channel. >> >> Saludos, >> Gonzalo >> >> >> On 6 November 2015 at 08:38, Guillermo Ortiz <[email protected]> wrote: >>> >>> Hello, >>> >>> I'm using the KafkaChannel, I have a doubt about the documentation. >>> >>> Expecting Avro datums with FlumeEvent schema in the channel. This >>> should be true if Flume source is writing to the channel And false if >>> other producers are writing into the topic that the channel is using >>> Flume source messages to Kafka can be parsed outside of Flume by using >>> org.apache.flume.source.avro.AvroFlumeEvent provided by the >>> flume-ng-sdk artifact >>> >>> In my PoC, I'm writing just with Kafka to my topic but I want to read >>> from the KafkaChannel with Spark. I have configured parseAsFlumeEvent >>> as true because just Flume writes to this topic. >>> What should I do to read the "events" from Kafka? DO I have to use >>> org.apache.flume.source.avro.AvroFlumeEvent??. If I set the parameter >>> as false, how are the events inserted? >> >> > > > -- > > Thanks, > Hari >
