TimoutExceptions during commitSync and commited
Hi, We have a Consumer that occasionally catches a TimeoutException when trying to commit an offset after polling. Since it's a ReatriableException the Consumer tries to roll back and read from the last committed offset. However when trying to fetch the last committed offset with committed(), it throws another TimeoutException. [Logs from committed()]: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the last committed offsett for partition could be determined. [Logs from commitSync()]: Offset commit failed on partition at offset : The coordinator is loading and hence can't process requests. Offset commit failed on partition at offset : The coordinator is loading and hence can't process requests. Offset commit failed on partition at offset : The coordinator is loading and hence can't process requests. ... I've tried changing default.api.timeout.ms to two minutes with no luck, still getting the same behavior after a total of four minutes. 1. Does this indicate that the Broker reloading its cache? 2. What could cause the Coordinator to be in a loading state after a poll? 3. Is there anything in our Consumer that could cause this behavior, i.e polling intervals? 4. Is there anything in our Broker that could cause this behavior, i.e re-balancing? 5. If it is reloading it's cache, shouldn't this be avoided by the heartbeats sent from the Consumer? Kind regards, Sergio
Consumer TimeoutException
Hi All, I'm frequently getting the below error in one of the application consumers. >From the error what I can infer is, the offset commit failed due to timeout after 30 seconds. One suggestion was to increase the timeout but I think it will just extend the time period. What should be the good way to handle this? *Note:* The consumer has auto commit disabled and after every poll commitAsync is performed. failed org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to send request after 3 ms. Thanks
How to create dynamic listeners by reading configuration details in plugin.conf file
The set of topics to listen to should be configured in a plugin.conf file, instead of ConsumerListener, there should be multiple such listeners all dynamically started up based on the configurationin the plugin.conf file
How to create dynamic listeners by reading configuration details in plugin.conf file
The set of topics to listen to should be configured in a plugin.conf file, instead of ConsumerListener, there should be multiple such listeners all dynamically started up based on the configurationin the plugin.conf file
Re: Trigger topic compaction before uploading to S3
These properties can't be triggered programatically. Kafka uses an internal thread pool called "Log Cleaner Thread" that does the job asynchronously of deleting old segments ("delete") and deleting repeated records ("compact"). Whatever the S3 connector picks up is already compacted and/or deleted. — Ricardo On Tue, 2020-09-22 at 11:50 +0200, Daniel Kraus wrote: > Hi, > I have a KStreams app that outputs a KTableto a topic with cleanup > policy "compact,delete". > I have the Confluent S3 Connector to store thistable in S3 where I do > further analysis with hive. > Now my question is, if there's a way to triggerlog compaction right > before the S3 Connectorreads the data so I store less data in S3 > thenwhen it simply copies all data from the stream? > Thanks, Daniel
Re: Not able to connect to bootstrap server when one broker down
Hi Manoj, Thanks but we caught the issue, it was coming most probably because the wrong jar was being picked up from hdfs and was being set in oozie classpath at runtime. In our code, kafka-client is on 2.3 but while running MR job 0.8.2.0 jar was being picked up. We caught it after seeing the producer client logs in our application, kafka-client version was different there. After removing that old jar, we have not observed this issue since. *Regards,* *Prateek Rajput* On Wed, Aug 26, 2020 at 9:55 AM wrote: > What error you are getting , can you share the exact error ? > What is version of kafka lib at client side ? > > On 8/25/20, 7:50 AM, "Prateek Rajput" > wrote: > > [External] > > > Hi, please if anyone can help, will be a huge favor. > > *Regards,* > *Prateek Rajput* > > > On Tue, Aug 25, 2020 at 12:06 AM Prateek Rajput < > prateek.raj...@flipkart.com> > wrote: > > > Hi everyone, > > I am new to Kafka, and recently started working on kafka in my > company. We > > recently migrated our client and cluster from the *0.10.x* version to > > *2.3.0*. I am facing this issue quite often. > > I have provided all brokers in *bootstrap.servers* config to > instantiate > > the producer client but while using this client for batch publishing, > > sometimes some of my mappers get stuck. > > I debugged and found that one broker was down (for some maintenance > > activity). Now it was getting stuck because the mapper's client was > trying > > to connect to that node only for the very first time. And it was > failing > > with NoRouteToHost Exception. > > I have read that the very first time the client will select a random > > broker and will try to connect with that broker to get the meta-data > of the > > whole cluster. Is there any way so that on such exceptions it can > switch to > > another node dynamically and should not try to connect to the same > box > > again and again. > > > > *Regards,* > > *Prateek Rajput* > > > > -- > > > > > *-* > > > *This email and any files transmitted with it are confidential and > intended solely for the use of the individual or entity to whom they > are > addressed. If you have received this email in error, please notify the > system manager. This message contains confidential information and is > intended only for the individual named. If you are not the named > addressee, > you should not disseminate, distribute or copy this email. Please > notify > the sender immediately by email if you have received this email by > mistake > and delete this email from your system. If you are not the intended > recipient, you are notified that disclosing, copying, distributing or > taking any action in reliance on the contents of this information is > strictly prohibited.* > > > > *Any views or opinions presented in this > email are solely those of the author and do not necessarily represent > those > of the organization. Any information on shares, debentures or similar > instruments, recommended product pricing, valuations and the like are > for > information purposes only. It is not meant to be an instruction or > recommendation, as the case may be, to buy or to sell securities, > products, > services nor an offer to buy or sell securities, products or services > unless specifically stated to be so on behalf of the Flipkart group. > Employees of the Flipkart group of companies are expressly required > not to > make defamatory statements and not to infringe or authorise any > infringement of copyright or any other legal right by email > communications. > Any such communication is contrary to organizational policy and > outside the > scope of the employment of the individual concerned. The organization > will > not accept any liability in respect of such communication, and the > employee > responsible will be personally liable for any damages or other > liability > arising.* > > > > *Our organization accepts no liability for the > content of this email, or for the consequences of any actions taken on > the > basis of the information *provided,* unless that information is > subsequently confirmed in writing. If you are not the intended > recipient, > you are notified that disclosing, copying, distributing or taking any > action in reliance on the contents of this information is strictly > prohibited.* > > > > _-_ > > > > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. If you are not the intended recipient(s), please reply to the > sender an
Trigger topic compaction before uploading to S3
Hi, I have a KStreams app that outputs a KTable to a topic with cleanup policy "compact,delete". I have the Confluent S3 Connector to store this table in S3 where I do further analysis with hive. Now my question is, if there's a way to trigger log compaction right before the S3 Connector reads the data so I store less data in S3 then when it simply copies all data from the stream? Thanks, Daniel
Re: Kafka streams - how to handle application level exception in event processing
Thank you Gilles..will take a look.. Bruno, thanks for your elaborate explanation as well... however it basically exposes my application to certain issues.. e.g. the application deals with agent states of a call center, and where the order of processing is important. So when agent is logged in then he keeps rotating between Ready, and Not ready states and at the end of the day he becomes Logged out... If while processing the Ready event, there is some temporary issue with database/network and the event processing gets exception, application does few retries but no luck. As per kafka polling, it will go ahead and poll next record from partition for the same agent (since agent id being key) and it will process logged out event. So, this mean i lost the Ready event in between due to the database issue? Even if i store this event somewhere for processing it later, processing the Ready event after logged out, doesn't make sense since order of state is important? Is my u On Tue, Sep 22, 2020 at 1:32 PM Gilles Philippart wrote: > Hi Pushkar, > > Uber has written about how they deal with failures and reprocessing here, > it might help you achieve what you describe: > https://eng.uber.com/reliable-reprocessing/. > > Unfortunately, there isn't much written documentation about those patterns. > There's also a good talk from Confluent's Antony Stubbs on how you can do > certain things with the Processor API that you can't do with the Kafka > Streams DSL: > > https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api > . > > Gilles Philippart > Funding Circle Engineering > > On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna wrote: > > > Hi Pushkar, > > > > I think there is a misunderstanding. If a consumer polls from a > > partition, it will always poll the next event independently whether the > > offset was committed or not. Committed offsets are used for fault > > tolerance, i.e., when a consumer crashes, the consumer that takes over > > the work of the crashed consumer will start polling record from the > > offset the crashed consumer committed last. This is not only true for > > Kafka Streams, but for all applications that use a Kafka consumer with > > subscription. > > > > To be clear, my proposal is not a workaround. This is one approach to > > solve your problem in Kafka Streams. You could have a look into > > stream-stream joins if you can use a stream instead of a global table. > > Another approach would be to use a plain Kafka consumer instead of Kafka > > Stream with which you would have a more fine-grained control about polls > > and commits. In any case, be aware that blocking processing on an event > > indefinitely may result in your lag and/or your state growing > > indefinitely. > > > > If you think there is something missing in Kafka Streams, you are very > > welcome to search through the tickets in > > https://issues.apache.org/jira/projects/KAFKA/issues and comment on > > tickets that would solve your issue or create a new one if you cannot > > find any. > > > > Best, > > Bruno > > > > On 22.09.20 05:09, Pushkar Deole wrote: > > > Bruno, > > > > > > So, essentially, we are just waiting on the processing of first event > > that > > > got an error before going ahead on to the next one. > > > > > > Second, if application handles storing the events in state store for > > retry, > > > Kafka stream would essentially commit the offset of those events, so > next > > > event will be polled by consumer, correct? > > > > > > Instead of this work around, is there any provision in kafka streams > for > > > this scenario? e.g. in case application registers application level > > > exceptions then kafka streams will take care of it and do all this > > > internally, and will not commit the offset of that event and hence will > > > keep polling the same event again? > > > Since this is a common scenario, using a particular configuration for > > users > > > can achieve this in Kafka streams internally? > > > > > > > > > On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna > > wrote: > > > > > >> Hi Pushkar, > > >> > > >> If you want to keep the order, you could still use the state store I > > >> suggested in my previous e-mail and implement a queue on top of it. > For > > >> that you need to put the events into the store with a key that > > >> represents the arrival order of the events. Each time a record is > > >> received from the input topic, the events are read in arrival order > from > > >> the state store and the data in the global table is probed. If an > event > > >> matches data from the global table the event is removed from the state > > >> store and emitted. If an event does not match data from the global > table > > >> the processing is stopped and nothing is emitted. > > >> > > >> Best, > > >> Bruno > > >> > > >> On 21.09.20 14:21, Pushkar Deole wrote: > > >>> Bruno, > > >>> > > >>> 1. the loading of topic mapped to GlobalKTable is done by some other > > >>> service/appli
Re: Kafka streams - how to handle application level exception in event processing
Hi Pushkar, Uber has written about how they deal with failures and reprocessing here, it might help you achieve what you describe: https://eng.uber.com/reliable-reprocessing/. Unfortunately, there isn't much written documentation about those patterns. There's also a good talk from Confluent's Antony Stubbs on how you can do certain things with the Processor API that you can't do with the Kafka Streams DSL: https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api . Gilles Philippart Funding Circle Engineering On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna wrote: > Hi Pushkar, > > I think there is a misunderstanding. If a consumer polls from a > partition, it will always poll the next event independently whether the > offset was committed or not. Committed offsets are used for fault > tolerance, i.e., when a consumer crashes, the consumer that takes over > the work of the crashed consumer will start polling record from the > offset the crashed consumer committed last. This is not only true for > Kafka Streams, but for all applications that use a Kafka consumer with > subscription. > > To be clear, my proposal is not a workaround. This is one approach to > solve your problem in Kafka Streams. You could have a look into > stream-stream joins if you can use a stream instead of a global table. > Another approach would be to use a plain Kafka consumer instead of Kafka > Stream with which you would have a more fine-grained control about polls > and commits. In any case, be aware that blocking processing on an event > indefinitely may result in your lag and/or your state growing > indefinitely. > > If you think there is something missing in Kafka Streams, you are very > welcome to search through the tickets in > https://issues.apache.org/jira/projects/KAFKA/issues and comment on > tickets that would solve your issue or create a new one if you cannot > find any. > > Best, > Bruno > > On 22.09.20 05:09, Pushkar Deole wrote: > > Bruno, > > > > So, essentially, we are just waiting on the processing of first event > that > > got an error before going ahead on to the next one. > > > > Second, if application handles storing the events in state store for > retry, > > Kafka stream would essentially commit the offset of those events, so next > > event will be polled by consumer, correct? > > > > Instead of this work around, is there any provision in kafka streams for > > this scenario? e.g. in case application registers application level > > exceptions then kafka streams will take care of it and do all this > > internally, and will not commit the offset of that event and hence will > > keep polling the same event again? > > Since this is a common scenario, using a particular configuration for > users > > can achieve this in Kafka streams internally? > > > > > > On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna > wrote: > > > >> Hi Pushkar, > >> > >> If you want to keep the order, you could still use the state store I > >> suggested in my previous e-mail and implement a queue on top of it. For > >> that you need to put the events into the store with a key that > >> represents the arrival order of the events. Each time a record is > >> received from the input topic, the events are read in arrival order from > >> the state store and the data in the global table is probed. If an event > >> matches data from the global table the event is removed from the state > >> store and emitted. If an event does not match data from the global table > >> the processing is stopped and nothing is emitted. > >> > >> Best, > >> Bruno > >> > >> On 21.09.20 14:21, Pushkar Deole wrote: > >>> Bruno, > >>> > >>> 1. the loading of topic mapped to GlobalKTable is done by some other > >>> service/application so when my application starts up, it will just > sync a > >>> GlobalKTable against that topic and if that other service/application > is > >>> still starting up then it may not have loaded that data on that topic > and > >>> that's the reason it is not available to my application through the > >>> GlobalKTable. > >>> > >>> 2. I don't want out of order processing to happen, that's the reason I > >> want > >>> my streams application to keep trying same event until the other > >>> application starts up and required data becomes available in > globalKtable > >>> > >>> > >>> On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna > >> wrote: > >>> > Thank you for clarifying! Now, I think I understand. > > You could put events for which required data in the global table is > not > available into a state store and each time an event from the input > topic > is processed, you could lookup all events in your state store and see > if > required data is now available for them. > > However, be aware that this can mix up the original order of the > events > in your input topic if required data of later events is available > before > required data of earlier events. Furthermore, you
Re: Two MirrorMakers 2 for two DCs
Yes, I use connect-mirror-maker.sh. On 2020/09/21 22:12:13, Ryanne Dolan wrote: > Oleg, yes you can run multiple MM2s for multiple DCs, and generally that's > what you want to do. Are you using Connect to run MM2, or the > connect-mirror-maker.sh driver? > > Ryanne > > On Mon, Sep 21, 2020, 3:38 PM Oleg Osipov > wrote: > > > I use the configuration for M2M for both datacentres > > clusters: > > - {"name": "dc1", "bootstrapServers": ip1} > > - {"name": "dc2", "bootstrapServers": ip2} > > > > Do you mean I need use additional names besides 'dc1' and 'dc2'? > > > > On 2020/09/21 17:27:50, nitin agarwal wrote: > > > Did you keep the cluster name the same ? If yes, then it will cause > > > conflict in metadata stored in MM2 internal topics. > > > > > > Thanks, > > > Nitin > > > > > > On Mon, Sep 21, 2020 at 10:36 PM Oleg Osipov > > > > > wrote: > > > > > > > Hello! > > > > > > > > I have two datacenters DC1 and DC2. When I deploy M2M in DC1 or DC2 all > > > > things look correct. I can create a topic, and this topic will be > > > > syncronized with another datacenter. In this case, I have only one > > mirror > > > > maker. But I want to deploy M2M in each DC. So after I done this, > > newly > > > > created topics aren't replicated anymore. It doesn't look as correct > > > > behavior. Am I wrong? Can I deploy M2M in two (or even more) > > datacenters? > > > > > > > > > >
Re: Kafka streams - how to handle application level exception in event processing
Hi Pushkar, I think there is a misunderstanding. If a consumer polls from a partition, it will always poll the next event independently whether the offset was committed or not. Committed offsets are used for fault tolerance, i.e., when a consumer crashes, the consumer that takes over the work of the crashed consumer will start polling record from the offset the crashed consumer committed last. This is not only true for Kafka Streams, but for all applications that use a Kafka consumer with subscription. To be clear, my proposal is not a workaround. This is one approach to solve your problem in Kafka Streams. You could have a look into stream-stream joins if you can use a stream instead of a global table. Another approach would be to use a plain Kafka consumer instead of Kafka Stream with which you would have a more fine-grained control about polls and commits. In any case, be aware that blocking processing on an event indefinitely may result in your lag and/or your state growing indefinitely. If you think there is something missing in Kafka Streams, you are very welcome to search through the tickets in https://issues.apache.org/jira/projects/KAFKA/issues and comment on tickets that would solve your issue or create a new one if you cannot find any. Best, Bruno On 22.09.20 05:09, Pushkar Deole wrote: Bruno, So, essentially, we are just waiting on the processing of first event that got an error before going ahead on to the next one. Second, if application handles storing the events in state store for retry, Kafka stream would essentially commit the offset of those events, so next event will be polled by consumer, correct? Instead of this work around, is there any provision in kafka streams for this scenario? e.g. in case application registers application level exceptions then kafka streams will take care of it and do all this internally, and will not commit the offset of that event and hence will keep polling the same event again? Since this is a common scenario, using a particular configuration for users can achieve this in Kafka streams internally? On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna wrote: Hi Pushkar, If you want to keep the order, you could still use the state store I suggested in my previous e-mail and implement a queue on top of it. For that you need to put the events into the store with a key that represents the arrival order of the events. Each time a record is received from the input topic, the events are read in arrival order from the state store and the data in the global table is probed. If an event matches data from the global table the event is removed from the state store and emitted. If an event does not match data from the global table the processing is stopped and nothing is emitted. Best, Bruno On 21.09.20 14:21, Pushkar Deole wrote: Bruno, 1. the loading of topic mapped to GlobalKTable is done by some other service/application so when my application starts up, it will just sync a GlobalKTable against that topic and if that other service/application is still starting up then it may not have loaded that data on that topic and that's the reason it is not available to my application through the GlobalKTable. 2. I don't want out of order processing to happen, that's the reason I want my streams application to keep trying same event until the other application starts up and required data becomes available in globalKtable On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna wrote: Thank you for clarifying! Now, I think I understand. You could put events for which required data in the global table is not available into a state store and each time an event from the input topic is processed, you could lookup all events in your state store and see if required data is now available for them. However, be aware that this can mix up the original order of the events in your input topic if required data of later events is available before required data of earlier events. Furthermore, you need to consider the case when you have a huge amount of events in the state store and suddenly all required data in the global table is available, because processing all those events at once might lead to exceeding max.poll.interval.ms and the stream thread might be kicked out of the consumer group. To solve that, you may want to limit the number of events processed at once. You also need to avoid the state store growing indefinitely if required data in the global table is not available for a long time or not available at all. Maybe all this caveats do not apply to your use case. Best, Bruno On 21.09.20 13:45, Pushkar Deole wrote: Say the application level exception is named as : MeasureDefinitionNotAvaialbleException What I am trying to achieve is: in above case when the event processing fails due to required data not available, the streams should not proceed on to next event, however it should wait for the processing of current event to complete. If I just