TimoutExceptions during commitSync and commited

2020-09-22 Thread Sergio Soria

Hi,

We have a Consumer that occasionally catches a TimeoutException when 
trying to commit an offset after polling. Since it's a 
ReatriableException the Consumer tries to roll back and read from the 
last committed offset. However when trying to fetch the last committed 
offset with committed(), it throws another TimeoutException.


[Logs from committed()]:

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
expired before the last committed offsett for partition  
could be determined.


[Logs from commitSync()]:

Offset commit failed on partition  at offset : 
The coordinator is loading and hence can't process requests.


Offset commit failed on partition  at offset : 
The coordinator is loading and hence can't process requests.


Offset commit failed on partition  at offset : 
The coordinator is loading and hence can't process requests.


...

I've tried changing default.api.timeout.ms to two minutes with no luck, 
still getting the same behavior after a total of four minutes.


1. Does this indicate that the Broker reloading its cache?
2. What could cause the Coordinator to be in a loading state after a poll?
3. Is there anything  in our Consumer that could cause this behavior,
   i.e polling intervals?
4. Is there anything in our Broker that could cause this behavior, i.e
   re-balancing?
5. If it is reloading it's cache, shouldn't this be avoided by the
   heartbeats sent from the Consumer?

Kind regards,

Sergio



Consumer TimeoutException

2020-09-22 Thread Navneeth Krishnan
Hi All,

I'm frequently getting the below error in one of the application consumers.
>From the error what I can infer is, the offset commit failed due to timeout
after 30 seconds. One suggestion was to increase the timeout but I think it
will just extend the time period. What should be the good way to handle
this?

*Note:* The consumer has auto commit disabled and after every poll
commitAsync is performed.

failed org.apache.kafka.clients.consumer.RetriableCommitFailedException:
Offset commit failed with a retriable exception. You should retry
committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to send
request after 3 ms.

Thanks


How to create dynamic listeners by reading configuration details in plugin.conf file

2020-09-22 Thread srinivasa bs
The set of topics to listen to should be configured in a plugin.conf file, 
instead of ConsumerListener, there should be multiple such listeners all 
dynamically started up based on the configurationin the plugin.conf file


How to create dynamic listeners by reading configuration details in plugin.conf file

2020-09-22 Thread srinivasa bs
The set of topics to listen to should be configured in a plugin.conf file, 
instead of ConsumerListener, there should be multiple such listeners all 
dynamically started up based on the configurationin the plugin.conf file


Re: Trigger topic compaction before uploading to S3

2020-09-22 Thread Ricardo Ferreira
These properties can't be triggered programatically. Kafka uses an
internal thread pool called "Log Cleaner Thread" that does the job
asynchronously of deleting old segments ("delete") and deleting
repeated records ("compact").
Whatever the S3 connector picks up is already compacted and/or deleted.
— Ricardo
On Tue, 2020-09-22 at 11:50 +0200, Daniel Kraus wrote:
> Hi,
> I have a KStreams app that outputs a KTableto a topic with cleanup
> policy "compact,delete".
> I have the Confluent S3 Connector to store thistable in S3 where I do
> further analysis with hive.
> Now my question is, if there's a way to triggerlog compaction right
> before the S3 Connectorreads the data so I store less data in S3
> thenwhen it simply copies all data from the stream?
> Thanks,  Daniel


Re: Not able to connect to bootstrap server when one broker down

2020-09-22 Thread Prateek Rajput
Hi Manoj,
Thanks but we caught the issue, it was coming most probably because the
wrong jar was being picked up from hdfs and was being set in oozie
classpath at runtime. In our code, kafka-client is on 2.3 but while running
MR job 0.8.2.0 jar was being picked up. We caught it after seeing the
producer client logs in our application, kafka-client version was
different there.
After removing that old jar, we have not observed this issue since.

*Regards,*
*Prateek Rajput* 


On Wed, Aug 26, 2020 at 9:55 AM  wrote:

> What error you are getting , can you share the exact error ?
> What is version of kafka lib at client side ?
>
> On 8/25/20, 7:50 AM, "Prateek Rajput" 
> wrote:
>
> [External]
>
>
> Hi, please if anyone can help, will be a huge favor.
>
> *Regards,*
> *Prateek Rajput* 
>
>
> On Tue, Aug 25, 2020 at 12:06 AM Prateek Rajput <
> prateek.raj...@flipkart.com>
> wrote:
>
> > Hi everyone,
> > I am new to Kafka, and recently started working on kafka in my
> company. We
> > recently migrated our client and cluster from the *0.10.x* version to
> > *2.3.0*. I am facing this issue quite often.
> > I have provided all brokers in *bootstrap.servers* config to
> instantiate
> > the producer client but while using this client for batch publishing,
> > sometimes some of my mappers get stuck.
> > I debugged and found that one broker was down (for some maintenance
> > activity). Now it was getting stuck because the mapper's client was
> trying
> > to connect to that node only for the very first time. And it was
> failing
> > with NoRouteToHost Exception.
> > I have read that the very first time the client will select a random
> > broker and will try to connect with that broker to get the meta-data
> of the
> > whole cluster. Is there any way so that on such exceptions it can
> switch to
> > another node dynamically and should not try to connect to the same
> box
> > again and again.
> >
> > *Regards,*
> > *Prateek Rajput* 
> >
>
> --
>
>
>
>
> *-*
>
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they
> are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named
> addressee,
> you should not disseminate, distribute or copy this email. Please
> notify
> the sender immediately by email if you have received this email by
> mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>  
>
> *Any views or opinions presented in this
> email are solely those of the author and do not necessarily represent
> those
> of the organization. Any information on shares, debentures or similar
> instruments, recommended product pricing, valuations and the like are
> for
> information purposes only. It is not meant to be an instruction or
> recommendation, as the case may be, to buy or to sell securities,
> products,
> services nor an offer to buy or sell securities, products or services
> unless specifically stated to be so on behalf of the Flipkart group.
> Employees of the Flipkart group of companies are expressly required
> not to
> make defamatory statements and not to infringe or authorise any
> infringement of copyright or any other legal right by email
> communications.
> Any such communication is contrary to organizational policy and
> outside the
> scope of the employment of the individual concerned. The organization
> will
> not accept any liability in respect of such communication, and the
> employee
> responsible will be personally liable for any damages or other
> liability
> arising.*
>
>  
>
> *Our organization accepts no liability for the
> content of this email, or for the consequences of any actions taken on
> the
> basis of the information *provided,* unless that information is
> subsequently confirmed in writing. If you are not the intended
> recipient,
> you are notified that disclosing, copying, distributing or taking any
> action in reliance on the contents of this information is strictly
> prohibited.*
>
>
>
> _-_
>
>
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. If you are not the intended recipient(s), please reply to the
> sender an

Trigger topic compaction before uploading to S3

2020-09-22 Thread Daniel Kraus
Hi,

I have a KStreams app that outputs a KTable
to a topic with cleanup policy "compact,delete".

I have the Confluent S3 Connector to store this
table in S3 where I do further analysis with hive.

Now my question is, if there's a way to trigger
log compaction right before the S3 Connector
reads the data so I store less data in S3 then
when it simply copies all data from the stream?

Thanks,
  Daniel


Re: Kafka streams - how to handle application level exception in event processing

2020-09-22 Thread Pushkar Deole
Thank you Gilles..will take a look..

Bruno, thanks for your elaborate explanation as well... however it
basically exposes my application to certain issues..

e.g. the application deals with agent states of a call center, and where
the order of processing is important. So when agent is logged in then he
keeps rotating between Ready, and Not ready states and at the end of the
day he becomes Logged out... If while processing the Ready event, there is
some temporary issue with database/network and the event processing gets
exception, application does few retries but no luck.
As per kafka polling, it will go ahead and poll next record from partition
for the same agent (since agent id being key) and it will process logged
out event. So, this mean i lost the Ready event in between due to the
database issue? Even if i store this event somewhere for processing it
later, processing the Ready event after logged out, doesn't make sense
since order of state is important? Is my u

On Tue, Sep 22, 2020 at 1:32 PM Gilles Philippart
 wrote:

> Hi Pushkar,
>
> Uber has written about how they deal with failures and reprocessing here,
> it might help you achieve what you describe:
> https://eng.uber.com/reliable-reprocessing/.
>
> Unfortunately, there isn't much written documentation about those patterns.
> There's also a good talk from Confluent's Antony Stubbs on how you can do
> certain things with the Processor API that you can't do with the Kafka
> Streams DSL:
>
> https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api
> .
>
> Gilles Philippart
> Funding Circle Engineering
>
> On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna  wrote:
>
> > Hi Pushkar,
> >
> > I think there is a misunderstanding. If a consumer polls from a
> > partition, it will always poll the next event independently whether the
> > offset was committed or not. Committed offsets are used for fault
> > tolerance, i.e., when a consumer crashes, the consumer that takes over
> > the work of the crashed consumer will start polling record from the
> > offset the crashed consumer committed last. This is not only true for
> > Kafka Streams, but for all applications that use a Kafka consumer with
> > subscription.
> >
> > To be clear, my proposal is not a workaround. This is one approach to
> > solve your problem in Kafka Streams. You could have a look into
> > stream-stream joins if you can use a stream instead of a global table.
> > Another approach would be to use a plain Kafka consumer instead of Kafka
> > Stream with which you would have a more fine-grained control about polls
> > and commits. In any case, be aware that blocking processing on an event
> > indefinitely may result in your lag and/or your state growing
> > indefinitely.
> >
> > If you think there is something missing in Kafka Streams, you are very
> > welcome to search through the tickets in
> > https://issues.apache.org/jira/projects/KAFKA/issues and comment on
> > tickets that would solve your issue or create a new one if you cannot
> > find any.
> >
> > Best,
> > Bruno
> >
> > On 22.09.20 05:09, Pushkar Deole wrote:
> > > Bruno,
> > >
> > > So, essentially, we are just waiting on the processing of first event
> > that
> > > got an error before going ahead on to the next one.
> > >
> > > Second, if application handles storing the events in state store for
> > retry,
> > > Kafka stream would essentially commit the offset of those events, so
> next
> > > event will be polled by consumer, correct?
> > >
> > > Instead of this work around, is there any provision in kafka streams
> for
> > > this scenario? e.g. in case application registers application level
> > > exceptions then kafka streams will take care of it and do all this
> > > internally, and will not commit the offset of that event and hence will
> > > keep polling the same event again?
> > > Since this is a common scenario, using a particular configuration for
> > users
> > > can achieve this in Kafka streams internally?
> > >
> > >
> > > On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna 
> > wrote:
> > >
> > >> Hi Pushkar,
> > >>
> > >> If you want to keep the order, you could still use the state store I
> > >> suggested in my previous e-mail and implement a queue on top of it.
> For
> > >> that you need to put the events into the store with a key that
> > >> represents the arrival order of the events. Each time a record is
> > >> received from the input topic, the events are read in arrival order
> from
> > >> the state store and the data in the global table is probed. If an
> event
> > >> matches data from the global table the event is removed from the state
> > >> store and emitted. If an event does not match data from the global
> table
> > >> the processing is stopped and nothing is emitted.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On 21.09.20 14:21, Pushkar Deole wrote:
> > >>> Bruno,
> > >>>
> > >>> 1. the loading of topic mapped to GlobalKTable is done by some other
> > >>> service/appli

Re: Kafka streams - how to handle application level exception in event processing

2020-09-22 Thread Gilles Philippart
Hi Pushkar,

Uber has written about how they deal with failures and reprocessing here,
it might help you achieve what you describe:
https://eng.uber.com/reliable-reprocessing/.

Unfortunately, there isn't much written documentation about those patterns.
There's also a good talk from Confluent's Antony Stubbs on how you can do
certain things with the Processor API that you can't do with the Kafka
Streams DSL:
https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api
.

Gilles Philippart
Funding Circle Engineering

On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna  wrote:

> Hi Pushkar,
>
> I think there is a misunderstanding. If a consumer polls from a
> partition, it will always poll the next event independently whether the
> offset was committed or not. Committed offsets are used for fault
> tolerance, i.e., when a consumer crashes, the consumer that takes over
> the work of the crashed consumer will start polling record from the
> offset the crashed consumer committed last. This is not only true for
> Kafka Streams, but for all applications that use a Kafka consumer with
> subscription.
>
> To be clear, my proposal is not a workaround. This is one approach to
> solve your problem in Kafka Streams. You could have a look into
> stream-stream joins if you can use a stream instead of a global table.
> Another approach would be to use a plain Kafka consumer instead of Kafka
> Stream with which you would have a more fine-grained control about polls
> and commits. In any case, be aware that blocking processing on an event
> indefinitely may result in your lag and/or your state growing
> indefinitely.
>
> If you think there is something missing in Kafka Streams, you are very
> welcome to search through the tickets in
> https://issues.apache.org/jira/projects/KAFKA/issues and comment on
> tickets that would solve your issue or create a new one if you cannot
> find any.
>
> Best,
> Bruno
>
> On 22.09.20 05:09, Pushkar Deole wrote:
> > Bruno,
> >
> > So, essentially, we are just waiting on the processing of first event
> that
> > got an error before going ahead on to the next one.
> >
> > Second, if application handles storing the events in state store for
> retry,
> > Kafka stream would essentially commit the offset of those events, so next
> > event will be polled by consumer, correct?
> >
> > Instead of this work around, is there any provision in kafka streams for
> > this scenario? e.g. in case application registers application level
> > exceptions then kafka streams will take care of it and do all this
> > internally, and will not commit the offset of that event and hence will
> > keep polling the same event again?
> > Since this is a common scenario, using a particular configuration for
> users
> > can achieve this in Kafka streams internally?
> >
> >
> > On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna 
> wrote:
> >
> >> Hi Pushkar,
> >>
> >> If you want to keep the order, you could still use the state store I
> >> suggested in my previous e-mail and implement a queue on top of it. For
> >> that you need to put the events into the store with a key that
> >> represents the arrival order of the events. Each time a record is
> >> received from the input topic, the events are read in arrival order from
> >> the state store and the data in the global table is probed. If an event
> >> matches data from the global table the event is removed from the state
> >> store and emitted. If an event does not match data from the global table
> >> the processing is stopped and nothing is emitted.
> >>
> >> Best,
> >> Bruno
> >>
> >> On 21.09.20 14:21, Pushkar Deole wrote:
> >>> Bruno,
> >>>
> >>> 1. the loading of topic mapped to GlobalKTable is done by some other
> >>> service/application so when my application starts up, it will just
> sync a
> >>> GlobalKTable against that topic and if that other service/application
> is
> >>> still starting up then it may not have loaded that data on that topic
> and
> >>> that's the reason it is not available to my application through the
> >>> GlobalKTable.
> >>>
> >>> 2. I don't want out of order processing to happen, that's the reason I
> >> want
> >>> my streams application to keep trying same event until the other
> >>> application starts up and required data becomes available in
> globalKtable
> >>>
> >>>
> >>> On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna 
> >> wrote:
> >>>
>  Thank you for clarifying! Now, I think I understand.
> 
>  You could put events for which required data in the global table is
> not
>  available into a state store and each time an event from the input
> topic
>  is processed, you could lookup all events in your state store and see
> if
>  required data is now available for them.
> 
>  However, be aware that this can mix up the original order of the
> events
>  in your input topic if required data of later events is available
> before
>  required data of earlier events. Furthermore, you

Re: Two MirrorMakers 2 for two DCs

2020-09-22 Thread Oleg Osipov
Yes, I use connect-mirror-maker.sh. 



On 2020/09/21 22:12:13, Ryanne Dolan  wrote: 
> Oleg, yes you can run multiple MM2s for multiple DCs, and generally that's
> what you want to do. Are you using Connect to run MM2, or the
> connect-mirror-maker.sh driver?
> 
> Ryanne
> 
> On Mon, Sep 21, 2020, 3:38 PM Oleg Osipov 
> wrote:
> 
> > I use the configuration for M2M for both datacentres
> >   clusters:
> > - {"name": "dc1", "bootstrapServers": ip1}
> > - {"name": "dc2", "bootstrapServers": ip2}
> >
> > Do you mean I need use additional names besides 'dc1' and 'dc2'?
> >
> > On 2020/09/21 17:27:50, nitin agarwal  wrote:
> > > Did you keep the cluster name the same ? If yes, then it will cause
> > > conflict in metadata stored in MM2 internal topics.
> > >
> > > Thanks,
> > > Nitin
> > >
> > > On Mon, Sep 21, 2020 at 10:36 PM Oleg Osipov  > >
> > > wrote:
> > >
> > > > Hello!
> > > >
> > > > I have two datacenters DC1 and DC2. When I deploy M2M in DC1 or DC2 all
> > > > things look correct. I can create a topic, and this topic will be
> > > > syncronized with another datacenter. In this case,  I have only one
> > mirror
> > > > maker. But I want to deploy M2M in each DC. So after I done this,
> > newly
> > > > created topics aren't replicated anymore. It doesn't look as correct
> > > > behavior. Am I wrong?  Can I deploy M2M in two (or even more)
> > datacenters?
> > > >
> > >
> >
> 


Re: Kafka streams - how to handle application level exception in event processing

2020-09-22 Thread Bruno Cadonna

Hi Pushkar,

I think there is a misunderstanding. If a consumer polls from a 
partition, it will always poll the next event independently whether the 
offset was committed or not. Committed offsets are used for fault 
tolerance, i.e., when a consumer crashes, the consumer that takes over 
the work of the crashed consumer will start polling record from the 
offset the crashed consumer committed last. This is not only true for 
Kafka Streams, but for all applications that use a Kafka consumer with 
subscription.


To be clear, my proposal is not a workaround. This is one approach to 
solve your problem in Kafka Streams. You could have a look into 
stream-stream joins if you can use a stream instead of a global table. 
Another approach would be to use a plain Kafka consumer instead of Kafka 
Stream with which you would have a more fine-grained control about polls 
and commits. In any case, be aware that blocking processing on an event 
indefinitely may result in your lag and/or your state growing 
indefinitely.


If you think there is something missing in Kafka Streams, you are very 
welcome to search through the tickets in 
https://issues.apache.org/jira/projects/KAFKA/issues and comment on 
tickets that would solve your issue or create a new one if you cannot 
find any.


Best,
Bruno

On 22.09.20 05:09, Pushkar Deole wrote:

Bruno,

So, essentially, we are just waiting on the processing of first event that
got an error before going ahead on to the next one.

Second, if application handles storing the events in state store for retry,
Kafka stream would essentially commit the offset of those events, so next
event will be polled by consumer, correct?

Instead of this work around, is there any provision in kafka streams for
this scenario? e.g. in case application registers application level
exceptions then kafka streams will take care of it and do all this
internally, and will not commit the offset of that event and hence will
keep polling the same event again?
Since this is a common scenario, using a particular configuration for users
can achieve this in Kafka streams internally?


On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna  wrote:


Hi Pushkar,

If you want to keep the order, you could still use the state store I
suggested in my previous e-mail and implement a queue on top of it. For
that you need to put the events into the store with a key that
represents the arrival order of the events. Each time a record is
received from the input topic, the events are read in arrival order from
the state store and the data in the global table is probed. If an event
matches data from the global table the event is removed from the state
store and emitted. If an event does not match data from the global table
the processing is stopped and nothing is emitted.

Best,
Bruno

On 21.09.20 14:21, Pushkar Deole wrote:

Bruno,

1. the loading of topic mapped to GlobalKTable is done by some other
service/application so when my application starts up, it will just sync a
GlobalKTable against that topic and if that other service/application is
still starting up then it may not have loaded that data on that topic and
that's the reason it is not available to my application through the
GlobalKTable.

2. I don't want out of order processing to happen, that's the reason I

want

my streams application to keep trying same event until the other
application starts up and required data becomes available in globalKtable


On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna 

wrote:



Thank you for clarifying! Now, I think I understand.

You could put events for which required data in the global table is not
available into a state store and each time an event from the input topic
is processed, you could lookup all events in your state store and see if
required data is now available for them.

However, be aware that this can mix up the original order of the events
in your input topic if required data of later events is available before
required data of earlier events. Furthermore, you need to consider the
case when you have a huge amount of events in the state store and
suddenly all required data in the global table is available, because
processing all those events at once might lead to exceeding
max.poll.interval.ms and the stream thread might be kicked out of the
consumer group. To solve that, you may want to limit the number of
events processed at once. You also need to avoid the state store growing
indefinitely if required data in the global table is not available for a
long time or not available at all. Maybe all this caveats do not apply
to your use case.

Best,
Bruno


On 21.09.20 13:45, Pushkar Deole wrote:

Say the application level exception is named as :
MeasureDefinitionNotAvaialbleException

What I am trying to achieve is: in above case when the event processing
fails due to required data not available, the streams should not

proceed

on

to next event, however it should wait for the processing of current

event

to complete. If I just