Hi Jeong,
that is exactly the problem.
The right topic name (eg: X) is present in the event headers. The Kakfa sink is
working normally with my Thrift source.
I have a problem only when I have to use the spoolDir source. SpoolDir is
probably not able to read those headers and instead of sending the event to
"X", it tries a default topic first ('default-flume-topic') and then fails,
like showed in the trace that I sent.
If I write the topic name in the Flume-ng agent.conf, then, the spoolDir is
working as well
eg: agent.sinks.kafka.topic = 'X'
but of course, I cannot go for this configuration, since I have several
different topics to work with.
Thanks a lot.
Simone Roselli
ITE Sysadmin
[email protected]
http://www.plista.com
----- Original Message -----
From: "Jeong-shik Jang" <[email protected]>
To: "user" <[email protected]>
Sent: Friday, May 20, 2016 11:16:17 AM
Subject: Re: Spooldir -> Kafka sink
Hi Simone,
I got better understanding; thanks for your explanation.
In that case, how about checking key name in headers; it is supposed to be
"topic".
User guide reads:
If the event header contains a “topic” field, the event will be published
to that topic overriding the topic configured here
JS
2016-05-20 18:08 GMT+09:00 Simone Roselli <[email protected]>:
> Hi Jeong,
>
> thanks for your answer.
>
> I already have my topics in Kafka, I don't need to create new topics.
> Unfortunately, the problem is different here.
>
> Problem in one sentence:
>
> ** The Spooldir source is not able to successfully send events to my Kafka
> topic, if the topic name is not set in the agent.conf **
>
>
>
> Simone Roselli
> ITE Sysadmin
> [email protected]
> http://www.plista.com
>
> ----- Original Message -----
> From: "Jeong-shik Jang" <[email protected]>
> To: "user" <[email protected]>
> Sent: Friday, May 20, 2016 3:00:02 AM
> Subject: Re: Spooldir -> Kafka sink
>
> Hi Simone,
>
> How about starting from checking your Kafka configuration? The related
> property name I think is "auto.create.topics.enable".
>
> auto.create.topics.enable true Enable auto creation of topic on the server.
> If this is set to true then attempts to produce, consume, or fetch metadata
> for a non-existent topic will automatically create it with the default
> replication factor and number of partitions.
>
> Default value is true so likely it is enabled but just to make sure.
>
> JS
>
> 2016-05-19 22:37 GMT+09:00 Simone Roselli <[email protected]>:
>
> > Hallo,
> >
> > I'm using 2 sinks (Kafka, Fileroll) in failover.
> >
> > If the Kafka sink is temporary unreachable, the Fileroll takes over and
> > writes events on a local dir.
> >
> > Then, I configure a spoolDir source, for a directory /dir, pointing to
> the
> > Kafka sink.
> >
> > When I try to move an event from the local dir to the spool dir, the
> event
> > doesn't reach Kafka and I get this:
> >
> > """
> > 9 May 2016 15:12:39,007 WARN
> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > (kafka.utils.Logging$class.warn:83) - Error while fetching metadata
> > [{TopicMetadata for topic default-flume-topic ->
> > No partition metadata for topic default-flume-topic due to
> > kafka.common.UnknownTopicOrPartitionException}] for topic
> > [default-flume-topic]: class
> kafka.common.UnknownTopicOrPartitionException
> >
> > 19 May 2016 15:12:39,007 ERROR
> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > (kafka.utils.Logging$class.error:97) - Failed to collate messages by
> > topic, partition due to: Failed to fetch topic metadata for topic:
> > default-flume-topic
> >
> > 19 May 2016 15:12:39,007 INFO
> > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> > class.info:68) - Back off for 100 ms before retrying send. Remaining
> > retries = 3
> >
> > 19 May 2016 15:12:39,108 INFO
> > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
> > class.info:68) - Fetching metadata from broker id:1,host:
> > broker01.doamain.com,port:9092 with correlation id 45270 for 2 topic(s)
> > Set(MyTopic, default-flume-topic)
> >
> > ...
> >
> > 19 May 2016 15:12:39,433 ERROR
> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
> > (kafka.utils.Logging$class.error:97) - Failed to send requests for
> topics
> > MyTopic,default-flume-topic with correlation ids in [xxx,xxx]
> >
> > """
> >
> > default-flume-topic = kafka topic used by flume-ng Kafka sink
> > MyTopic = my actual target topic, present in the event headers
> >
> > In the agent.conf i didn't set any topic name as topic names are
> > dynamically assigned. If I define a topic name in the agent.conf, then it
> > works.
> >
> >
> > Any clues?
> > Thanks
> >
> >
> >
> > Simone Roselli
> > ITE Sysadmin
> > [email protected]
> > http://www.plista.com
> >
>