Awesome, thanks! I'll take a look!
On Mon, Mar 14, 2016 at 11:27 AM, Jason Gustafson <ja...@confluent.io> wrote: > Yeah, that's the idea. Here's the JIRA I was thinking of: > https://issues.apache.org/jira/browse/KAFKA-2273. I'm guessing this will > need a KIP after 0.10 is out. > > -Jason > > On Mon, Mar 14, 2016 at 11:21 AM, Christian Posta < > christian.po...@gmail.com > > wrote: > > > Jason, > > > > Can you link to the proposal so I can take a look? Would the "sticky" > > proposal prefer to keep partitions assigned to consumers who currently > have > > them and have not failed? > > > > On Mon, Mar 14, 2016 at 10:16 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Michael, > > > > > > I don't think a policy of retrying indefinitely is generally possible > > with > > > the new consumer even if you had a heartbeat API. The problem is that > the > > > consumer itself doesn't control when the group needs to rebalance. If > > > another consumer joins or leaves the group, then all consumers will > need > > to > > > rebalance, regardless whether they are in the middle of message > > processing > > > or not. Once the rebalance completes, the consumer may or may not get > > > assigned the same partition that the message came from. That said, if a > > > rebalance is unlikely because the group is stable, then you could use > the > > > pause() API to move the message processing to a background thread. What > > > this would look like is basically this: > > > > > > 1. Receive message from poll() from partition 0. > > > 2. Pause partition 0 using pause(). > > > 3. Send the message to a background thread for processing and continue > > > calling poll(). > > > 4. When the processing finishes, resume() the partition. > > > 5. If the group rebalances before processing finishes, there are two > > cases: > > > a) if partition 0 is reassigned, pause() it again in the > > > onPartitionsAssigned() callback (and you may also want to verify that > the > > > last committed offset is still what you expect) > > > b) otherwise, abort the background processing thread. > > > > > > Would that work for your case? It's also worth mentioning that there's > a > > > proposal to add a sticky partition assignor to Kafka, which would make > > 5.b > > > less likely. > > > > > > -Jason > > > > > > > > > > > > On Fri, Mar 11, 2016 at 1:03 AM, Michael Freeman <mikfree...@gmail.com > > > > > wrote: > > > > > > > Thanks Christian, > > > > Sending a heartbeat without having to > > poll > > > > would also be useful when using a large max.partition.fetch.bytes. > > > > > > > > For now I'm just going to shut the consumer down and restart after x > > > > period of time. > > > > > > > > Thanks for your insights. > > > > > > > > Michael > > > > > > > > > On 10 Mar 2016, at 18:33, Christian Posta < > christian.po...@gmail.com > > > > > > > wrote: > > > > > > > > > > Yah that's a good point. That was brought up in another thread. > > > > > > > > > > The granularity of what poll() needs to be addressed. It tries to > do > > > too > > > > > many things at once, including heartbeating. Not so sure that's > > > entirely > > > > > necessary. > > > > > > > > > > On Thu, Mar 10, 2016 at 1:40 AM, Michael Freeman < > > mikfree...@gmail.com > > > > > > > > > wrote: > > > > > > > > > >> Thanks Christian, > > > > >> We would want to retry indefinitely. > > Or > > > at > > > > >> least for say x minutes. If we don't poll how do we keep the heart > > > beat > > > > >> alive to Kafka. We never want to loose this message and only want > to > > > > commit > > > > >> to Kafka when the message is in Mongo. That's either as a > successful > > > > >> message in a collection or an unsuccessful message in an error > > > > collection. > > > > >> > > > > >> Right now I let the consumer die and don't create a new one for x > > > > minutes. > > > > >> This causes a lot of rebalancing. > > > > >> > > > > >> Michael > > > > >> > > > > >>>> On 9 Mar 2016, at 21:12, Christian Posta < > > christian.po...@gmail.com > > > > > > > > >>> wrote: > > > > >>> > > > > >>> So can you have to decide how long you're willing to "wait" for > the > > > > mongo > > > > >>> db to come back, and what you'd like to do with that message. So > > for > > > > >>> example, do you just retry inserting to Mongo for a predefined > > period > > > > of > > > > >>> time? Do you try forever? If you try forever, are you okay with > the > > > > >>> consumer threads blocking indefinitely? Or maybe you implement a > > > > "circuit > > > > >>> breaker" to shed load to mongo? Or are you willing to stash the > > > message > > > > >>> into a DLQ and move on and try the next message? > > > > >>> > > > > >>> You don't need to "re-consume" the message do you? Can you just > > retry > > > > >>> and/or backoff-retry with the message you have? And just do the > > > > "commit" > > > > >> of > > > > >>> the offset if successfully? > > > > >>> > > > > >>> > > > > >>> > > > > >>> On Wed, Mar 9, 2016 at 2:00 PM, Michael Freeman < > > > mikfree...@gmail.com> > > > > >>> wrote: > > > > >>> > > > > >>>> Hey, > > > > >>>> My team is new to Kafka and we are using the examples found > > at. > > > > >> > > > > > > > > > > http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client > > > > >>>> > > > > >>>> We process messages from kafka and persist them to Mongo. > > > > >>>> If Mongo is unavailable we are wondering how we can re-consume > the > > > > >> messages > > > > >>>> while we wait for Mongo to come back up. > > > > >>>> > > > > >>>> Right now we commit after the messages for each partition are > > > > processed > > > > >>>> (Following the example). > > > > >>>> I have tried a few approaches. > > > > >>>> > > > > >>>> 1. Catch the application exception and skip the kafka commit. > > > However > > > > >> the > > > > >>>> next poll does not re consume the messages. > > > > >>>> 2. Allow the consumer to fail and restart the consumer. This > works > > > but > > > > >>>> causes a rebalance. > > > > >>>> > > > > >>>> Should I attempt to store the offset and parition (in memory) > > > instead > > > > >> and > > > > >>>> attempt to reseek in order to re consume the messages? > > > > >>>> > > > > >>>> Whats the best practice approach in this kind of situation? My > > > > priority > > > > >> is > > > > >>>> to never loose a message and to ensure it makes it to Mongo. > > > > >> (Redelivery is > > > > >>>> ok) > > > > >>>> > > > > >>>> Thanks for any help or pointers in the right direction. > > > > >>>> > > > > >>>> Michael > > > > >>> > > > > >>> > > > > >>> > > > > >>> -- > > > > >>> *Christian Posta* > > > > >>> twitter: @christianposta > > > > >>> http://www.christianposta.com/blog > > > > >>> http://fabric8.io > > > > > > > > > > > > > > > > > > > > -- > > > > > *Christian Posta* > > > > > twitter: @christianposta > > > > > http://www.christianposta.com/blog > > > > > http://fabric8.io > > > > > > > > > > > > > > > -- > > *Christian Posta* > > twitter: @christianposta > > http://www.christianposta.com/blog > > http://fabric8.io > > > -- *Christian Posta* twitter: @christianposta http://www.christianposta.com/blog http://fabric8.io