Hi Simone, I see your point.
How about using "sink.serializer = avro_event" for "file_roll sink" so that you can keep full event information including headers rather than body only, and for spooldir source, to recover event back, using "deserializer = AVRO"? I am not sure it will work as I've never tried but just for the case you think it makes sense. JS 2016-05-20 18:48 GMT+09:00 Simone Roselli <[email protected]>: > Hi Jeong, > > that is exactly the problem. > > The right topic name (eg: X) is present in the event headers. The Kakfa > sink is working normally with my Thrift source. > > I have a problem only when I have to use the spoolDir source. SpoolDir is > probably not able to read those headers and instead of sending the event to > "X", it tries a default topic first ('default-flume-topic') and then fails, > like showed in the trace that I sent. > > > If I write the topic name in the Flume-ng agent.conf, then, the spoolDir > is working as well > > eg: agent.sinks.kafka.topic = 'X' > > but of course, I cannot go for this configuration, since I have several > different topics to work with. > > > Thanks a lot. > > Simone Roselli > ITE Sysadmin > [email protected] > http://www.plista.com > > ----- Original Message ----- > From: "Jeong-shik Jang" <[email protected]> > To: "user" <[email protected]> > Sent: Friday, May 20, 2016 11:16:17 AM > Subject: Re: Spooldir -> Kafka sink > > Hi Simone, > > I got better understanding; thanks for your explanation. > In that case, how about checking key name in headers; it is supposed to be > "topic". > > User guide reads: > If the event header contains a “topic” field, the event will be published > to that topic overriding the topic configured here > > JS > > > 2016-05-20 18:08 GMT+09:00 Simone Roselli <[email protected]>: > > > Hi Jeong, > > > > thanks for your answer. > > > > I already have my topics in Kafka, I don't need to create new topics. > > Unfortunately, the problem is different here. > > > > Problem in one sentence: > > > > ** The Spooldir source is not able to successfully send events to my > Kafka > > topic, if the topic name is not set in the agent.conf ** > > > > > > > > Simone Roselli > > ITE Sysadmin > > [email protected] > > http://www.plista.com > > > > ----- Original Message ----- > > From: "Jeong-shik Jang" <[email protected]> > > To: "user" <[email protected]> > > Sent: Friday, May 20, 2016 3:00:02 AM > > Subject: Re: Spooldir -> Kafka sink > > > > Hi Simone, > > > > How about starting from checking your Kafka configuration? The related > > property name I think is "auto.create.topics.enable". > > > > auto.create.topics.enable true Enable auto creation of topic on the > server. > > If this is set to true then attempts to produce, consume, or fetch > metadata > > for a non-existent topic will automatically create it with the default > > replication factor and number of partitions. > > > > Default value is true so likely it is enabled but just to make sure. > > > > JS > > > > 2016-05-19 22:37 GMT+09:00 Simone Roselli <[email protected]>: > > > > > Hallo, > > > > > > I'm using 2 sinks (Kafka, Fileroll) in failover. > > > > > > If the Kafka sink is temporary unreachable, the Fileroll takes over and > > > writes events on a local dir. > > > > > > Then, I configure a spoolDir source, for a directory /dir, pointing to > > the > > > Kafka sink. > > > > > > When I try to move an event from the local dir to the spool dir, the > > event > > > doesn't reach Kafka and I get this: > > > > > > """ > > > 9 May 2016 15:12:39,007 WARN > > > [SinkRunner-PollingRunner-FailoverSinkProcessor] > > > (kafka.utils.Logging$class.warn:83) - Error while fetching metadata > > > [{TopicMetadata for topic default-flume-topic -> > > > No partition metadata for topic default-flume-topic due to > > > kafka.common.UnknownTopicOrPartitionException}] for topic > > > [default-flume-topic]: class > > kafka.common.UnknownTopicOrPartitionException > > > > > > 19 May 2016 15:12:39,007 ERROR > > > [SinkRunner-PollingRunner-FailoverSinkProcessor] > > > (kafka.utils.Logging$class.error:97) - Failed to collate messages by > > > topic, partition due to: Failed to fetch topic metadata for topic: > > > default-flume-topic > > > > > > 19 May 2016 15:12:39,007 INFO > > > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$ > > > class.info:68) - Back off for 100 ms before retrying send. Remaining > > > retries = 3 > > > > > > 19 May 2016 15:12:39,108 INFO > > > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$ > > > class.info:68) - Fetching metadata from broker id:1,host: > > > broker01.doamain.com,port:9092 with correlation id 45270 for 2 > topic(s) > > > Set(MyTopic, default-flume-topic) > > > > > > ... > > > > > > 19 May 2016 15:12:39,433 ERROR > > > [SinkRunner-PollingRunner-FailoverSinkProcessor] > > > (kafka.utils.Logging$class.error:97) - Failed to send requests for > > topics > > > MyTopic,default-flume-topic with correlation ids in [xxx,xxx] > > > > > > """ > > > > > > default-flume-topic = kafka topic used by flume-ng Kafka sink > > > MyTopic = my actual target topic, present in the event headers > > > > > > In the agent.conf i didn't set any topic name as topic names are > > > dynamically assigned. If I define a topic name in the agent.conf, then > it > > > works. > > > > > > > > > Any clues? > > > Thanks > > > > > > > > > > > > Simone Roselli > > > ITE Sysadmin > > > [email protected] > > > http://www.plista.com > > > > > >
