[ 
https://issues.apache.org/jira/browse/FLUME-2942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372573#comment-15372573
 ] 

Sebastian Alfers edited comment on FLUME-2942 at 12/20/16 9:39 AM:
-------------------------------------------------------------------

Hi [~mpercy] , thanks for you reply.

This is our config:

# AGENT SETTINGS
agent1.channels = ch1
agent1.sources = thriftSrc spool
agent1.sinks = kafka fileroll
agent1.sinkgroups = g1

# MEMORY CHANNEL
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 500

# THRIFT (source)
agent1.sources.thriftSrc.type = thrift
agent1.sources.thriftSrc.channels = ch1
agent1.sources.thriftSrc.bind = 0.0.0.0
agent1.sources.thriftSrc.port = 4042

# SPOOLDIR (source)
agent1.sources.spool.type = spooldir
agent1.sources.spool.channels = ch1
agent1.sources.spool.spoolDir = /opt/flume-ng/failover/spool
agent1.sources.spool.fileHeader = true

agent1.sources.spool.deserializer = AVRO

agent1.sources.thriftSrc.threads = 150

agent1.sinks.kafka.channel = ch1 
agent1.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka.batchSize = 50
agent1.sinks.kafka.brokerList = host-a:9092,host-b:9092
#agent1.sinks.kafka.topic = HPTStream.raw


# FILE ROLL (failover sink)
agent1.sinks.fileroll.type = file_roll
agent1.sinks.fileroll.channel = ch1
agent1.sinks.fileroll.sink.directory = /opt/flume-ng/failover/data
agent1.sinks.fileroll.sink.serializer = avro_event

# FAILOVER GROUP
agent1.sinkgroups.g1.sinks = kafka fileroll
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.kafka = 10
agent1.sinkgroups.g1.processor.priority.fileroll = 5
agent1.sinkgroups.g1.processor.maxpenalty = 10000

Please look at the agent1.sources.spool.deserializer config. It refers to the 
reference above.

Here, we use our FQCN to apply the fix.


was (Author: sebalf):
Hi [~mpercy] , thanks for you reply.

This is our config:

# AGENT SETTINGS
agent1.channels = ch1
agent1.sources = thriftSrc spool
agent1.sinks = kafka fileroll
agent1.sinkgroups = g1

# MEMORY CHANNEL
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 500

# THRIFT (source)
agent1.sources.thriftSrc.type = thrift
agent1.sources.thriftSrc.channels = ch1
agent1.sources.thriftSrc.bind = 0.0.0.0
agent1.sources.thriftSrc.port = 4042

# SPOOLDIR (source)
agent1.sources.spool.type = spooldir
agent1.sources.spool.channels = ch1
agent1.sources.spool.spoolDir = /opt/flume-ng/failover/spool
agent1.sources.spool.fileHeader = true

agent1.sources.spool.deserializer = AVRO

agent1.sources.thriftSrc.threads = 150

agent1.sinks.kafka.channel = ch1 
agent1.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka.batchSize = 50
agent1.sinks.kafka.brokerList = 
plista590.plista.com:9092,plista591.plista.com:9092
#agent1.sinks.kafka.topic = HPTStream.raw


# FILE ROLL (failover sink)
agent1.sinks.fileroll.type = file_roll
agent1.sinks.fileroll.channel = ch1
agent1.sinks.fileroll.sink.directory = /opt/flume-ng/failover/data
agent1.sinks.fileroll.sink.serializer = avro_event

# FAILOVER GROUP
agent1.sinkgroups.g1.sinks = kafka fileroll
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.kafka = 10
agent1.sinkgroups.g1.processor.priority.fileroll = 5
agent1.sinkgroups.g1.processor.maxpenalty = 10000

Please look at the agent1.sources.spool.deserializer config. It refers to the 
reference above.

Here, we use our FQCN to apply the fix.

> AvroEventDeserializer ignores header from spool source
> ------------------------------------------------------
>
>                 Key: FLUME-2942
>                 URL: https://issues.apache.org/jira/browse/FLUME-2942
>             Project: Flume
>          Issue Type: Bug
>    Affects Versions: v1.6.0
>            Reporter: Sebastian Alfers
>
> I have a spool file source and use avro for de-/serialization
> In detail, serialized events store the topic of the kafka sink in the header.
> When I load the events from the spool directory, the header are ignored. 
> Please see: 
> https://github.com/apache/flume/blob/caa64a1a6d4bc97be5993cb468516e9ffe862794/flume-ng-core/src/main/java/org/apache/flume/serialization/AvroEventDeserializer.java#L122
> You can see, it uses the whole event as body but does not distinguish between 
> the header and body encoded by avro.
> Please verify that this is a bug.
> I fixed this but by using the record that stores header and body separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to