[ https://issues.apache.org/jira/browse/FLUME-2942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372573#comment-15372573 ]
Sebastian Alfers edited comment on FLUME-2942 at 12/20/16 9:39 AM: ------------------------------------------------------------------- Hi [~mpercy] , thanks for you reply. This is our config: # AGENT SETTINGS agent1.channels = ch1 agent1.sources = thriftSrc spool agent1.sinks = kafka fileroll agent1.sinkgroups = g1 # MEMORY CHANNEL agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 10000 agent1.channels.ch1.transactionCapacity = 500 # THRIFT (source) agent1.sources.thriftSrc.type = thrift agent1.sources.thriftSrc.channels = ch1 agent1.sources.thriftSrc.bind = 0.0.0.0 agent1.sources.thriftSrc.port = 4042 # SPOOLDIR (source) agent1.sources.spool.type = spooldir agent1.sources.spool.channels = ch1 agent1.sources.spool.spoolDir = /opt/flume-ng/failover/spool agent1.sources.spool.fileHeader = true agent1.sources.spool.deserializer = AVRO agent1.sources.thriftSrc.threads = 150 agent1.sinks.kafka.channel = ch1 agent1.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafka.batchSize = 50 agent1.sinks.kafka.brokerList = host-a:9092,host-b:9092 #agent1.sinks.kafka.topic = HPTStream.raw # FILE ROLL (failover sink) agent1.sinks.fileroll.type = file_roll agent1.sinks.fileroll.channel = ch1 agent1.sinks.fileroll.sink.directory = /opt/flume-ng/failover/data agent1.sinks.fileroll.sink.serializer = avro_event # FAILOVER GROUP agent1.sinkgroups.g1.sinks = kafka fileroll agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.priority.kafka = 10 agent1.sinkgroups.g1.processor.priority.fileroll = 5 agent1.sinkgroups.g1.processor.maxpenalty = 10000 Please look at the agent1.sources.spool.deserializer config. It refers to the reference above. Here, we use our FQCN to apply the fix. was (Author: sebalf): Hi [~mpercy] , thanks for you reply. This is our config: # AGENT SETTINGS agent1.channels = ch1 agent1.sources = thriftSrc spool agent1.sinks = kafka fileroll agent1.sinkgroups = g1 # MEMORY CHANNEL agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 10000 agent1.channels.ch1.transactionCapacity = 500 # THRIFT (source) agent1.sources.thriftSrc.type = thrift agent1.sources.thriftSrc.channels = ch1 agent1.sources.thriftSrc.bind = 0.0.0.0 agent1.sources.thriftSrc.port = 4042 # SPOOLDIR (source) agent1.sources.spool.type = spooldir agent1.sources.spool.channels = ch1 agent1.sources.spool.spoolDir = /opt/flume-ng/failover/spool agent1.sources.spool.fileHeader = true agent1.sources.spool.deserializer = AVRO agent1.sources.thriftSrc.threads = 150 agent1.sinks.kafka.channel = ch1 agent1.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafka.batchSize = 50 agent1.sinks.kafka.brokerList = plista590.plista.com:9092,plista591.plista.com:9092 #agent1.sinks.kafka.topic = HPTStream.raw # FILE ROLL (failover sink) agent1.sinks.fileroll.type = file_roll agent1.sinks.fileroll.channel = ch1 agent1.sinks.fileroll.sink.directory = /opt/flume-ng/failover/data agent1.sinks.fileroll.sink.serializer = avro_event # FAILOVER GROUP agent1.sinkgroups.g1.sinks = kafka fileroll agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.priority.kafka = 10 agent1.sinkgroups.g1.processor.priority.fileroll = 5 agent1.sinkgroups.g1.processor.maxpenalty = 10000 Please look at the agent1.sources.spool.deserializer config. It refers to the reference above. Here, we use our FQCN to apply the fix. > AvroEventDeserializer ignores header from spool source > ------------------------------------------------------ > > Key: FLUME-2942 > URL: https://issues.apache.org/jira/browse/FLUME-2942 > Project: Flume > Issue Type: Bug > Affects Versions: v1.6.0 > Reporter: Sebastian Alfers > > I have a spool file source and use avro for de-/serialization > In detail, serialized events store the topic of the kafka sink in the header. > When I load the events from the spool directory, the header are ignored. > Please see: > https://github.com/apache/flume/blob/caa64a1a6d4bc97be5993cb468516e9ffe862794/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java#L122 > You can see, it uses the whole event as body but does not distinguish between > the header and body encoded by avro. > Please verify that this is a bug. > I fixed this but by using the record that stores header and body separately. -- This message was sent by Atlassian JIRA (v6.3.4#6332)