Hey Yui, Sorry, I haven't had a chance to respond. I've got a pretty busy couple of weeks coming up, so I don't know when I'll look at this, but I find this puzzling. I'll save your email and try what you said to see if I can figure it out. Thanks for the repro code.
Let me know if you figure it out. Also, if you think you've found a bug, feel free to file a jira ticket as well. It might get broader visibility that way. Thanks, -John On Thu, Sep 13, 2018 at 1:57 AM Yui Yoi <shalosh...@gmail.com> wrote: > Hi Adam and John, thank you for your effort! > We are implementing full idem-potency in our projects so that's nothing to > worry about. > As to what John said - we only have one partition, I personally assured > that. > So as i wrote in section 2. of my first message in this conversation - my > stream should have processed the "asd" message again because it is not > committed yet. > That's why i suspect it has something to do with the stream's cache; maybe > something like: > 1. "asd" got processed and restored in cache > 2. "{}" got processed and cached too. > 3. commit interval makes the stream commit the offset of "{}" > > > B.t.w > If you want to run my application you should: > 1. open it in some java editor as maven project > 2. run it as a normal java application > 3. setup kafka server & zookeeper on localhost > 4. then you can send the above messages via cli > > John - even if you send "asd1", "asd2", "asd3" you will see in the logs > that my app takes the latest each time > > Of course that's far beyond what i can ask from you guys to do, thanks a > lot for your help. > > On Wed, Sep 12, 2018 at 8:14 PM John Roesler <j...@confluent.io> wrote: > > > Hi! > > > > As Adam said, if you throw an exception during processing, it should > cause > > Streams to shut itself down and *not* commit that message. Therefore, > when > > you start up again, it should again attempt to process that same message > > (and shut down again). > > > > Within a single partition, messages are processed in order, so a bad > > message will block the queue, and you should not see subsequent messages > > get processed. > > > > However, if your later message "{}" goes to a different partition than > the > > bad message, then there's no relationship between them, and the later, > > good, message might get processed. > > > > Does that help? > > -John > > > > On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare <adam.bellem...@gmail.com > > > > wrote: > > > > > Hi Yui Yoi > > > > > > > > > Keep in mind that Kafka Consumers don't traditionally request only a > > single > > > message at a time, but instead requests them in batches. This allows > for > > > much higher throughput, but does result in the scenario of > > "at-least-once" > > > processing. Generally what will happen in this scenario is the > following: > > > > > > 1) Client requests the next set of messages from offset (t). For > example, > > > assume it gets 10 messages and message 6 is "bad". > > > 2) The client's processor will then process the messages one at a time. > > > Note that the offsets are not committed after the message is processed, > > but > > > only at the end of the batch. > > > 3) The bad message it hit by the processor. At this point you can > decide > > to > > > skip the message, throw an exception, etc. > > > 4a) If you decide to skip the message, processing will continue. Once > all > > > 10 messages are processed, the new offset (t+10) offset is committed > back > > > to Kafka. > > > 4b) If you decide to throw an exception and terminate your app, you > will > > > have still processed the messages that came before the bad message. > > Because > > > the offset (t+10) is not committed, the next time you start the app it > > will > > > consume from offset t, and those messages will be processed again. This > > is > > > "at-least-once" processing. > > > > > > > > > Now, if you need exactly-once processing, you have two choices - > > > 1) Use Kafka Streams with exactly-once semantics (though, as I am not > > > familiar with your framework, it may support it as well). > > > 2) Use idempotent practices (ie: it doesn't matter if the same messages > > get > > > processed more than once). > > > > > > > > > Hope this helps - > > > > > > Adam > > > > > > > > > On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi <shalosh...@gmail.com> wrote: > > > > > > > Hi Adam, > > > > Thanks a lot for the rapid response, it did helped! > > > > > > > > Let me though ask one more simple question: Can I make a stream > > > application > > > > stuck on an invalid message? and not consuming any further messages? > > > > > > > > Thanks again > > > > > > > > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare < > > adam.bellem...@gmail.com > > > > > > > > wrote: > > > > > > > > > Hi Yui Yoi > > > > > > > > > > Preface: I am not familiar with the spring framework. > > > > > > > > > > "Earliest" when it comes to consuming from Kafka means, "Start > > reading > > > > from > > > > > the first message in the topic, *if there is no offset stored for > > that > > > > > consumer group*". It sounds like you are expecting it to re-read > each > > > > > message whenever a new message comes in. This is not going to > happen, > > > as > > > > > there will be a committed offset and "earliest" will no longer be > > used. > > > > If > > > > > you were to use "latest" instead, if a consumer is started that > does > > > not > > > > > have a valid offset, it would use the very latest message in the > > topic > > > as > > > > > the starting offset for message consumption. > > > > > > > > > > Now, if you are using the same consumer group each time you run the > > > > > application (which it seems is true, as you have "test-group" > > hardwired > > > > in > > > > > your application.yml), but you do not tear down your local cluster > > and > > > > > clear out its state, you will indeed see the behaviour you > describe. > > > > > Remember that Kafka is durable, and maintains the offsets when the > > > > > individual applications go away. So you are probably seeing this: > > > > > > > > > > 1) start application instance 1. It realizes it has no offset when > it > > > > tries > > > > > to register as a consumer on the input topic, so it creates a new > > > > consumer > > > > > entry for "earliest" for your consumer group. > > > > > 2) send message "asd" > > > > > 3) application instance 1 receives "asd", processes it, and updates > > the > > > > > offset (offset head = 1) > > > > > 4) Terminate instance 1 > > > > > 5) Start application instance 2. It detects correctly that consumer > > > group > > > > > "test-group" is available and reads that offset as its starting > > point. > > > > > 6) send message "{}" > > > > > 7) application instance 2 receives "{}", processes it, and updates > > the > > > > > offset (offset head = 2) > > > > > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it > is > > > > > telling the Kafka cluster that it belongs to the same consumer > group > > as > > > > > application 1. > > > > > > > > > > Hope this helps, > > > > > > > > > > Adam > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <shalosh...@gmail.com> > > wrote: > > > > > > > > > > > TL;DR: > > > > > > my streams application skips uncommitted messages > > > > > > > > > > > > Hello, > > > > > > I'm using streams API via spring framework and experiencing a > weird > > > > > > behavior which I would like to get an explanation to: > > > > > > First of all: The attached zip is my test project, I used kafka > cli > > > to > > > > > run > > > > > > a localhost broker and zookeeper > > > > > > > > > > > > what is happening is as follows: > > > > > > 1. I send an invalid message, such as "asd", and my consumer has > a > > > lag > > > > > and > > > > > > error message as expected > > > > > > 2. I send a valid message such as "{}", but instead of rereading > > the > > > > > first > > > > > > message as expected from an "earliest" configured application - > my > > > > > > application reads the latest message, commits it and ignoring the > > one > > > > in > > > > > > error, thus i have no lag! > > > > > > 3. When I'm running my application when there are uncommitted > > > messages > > > > - > > > > > > my application reads the FIRST not committed message, as if it IS > > an > > > > > > "earliest" configured application! > > > > > > > > > > > > In your documentation you assure "at least once" behavior, but > > > > according > > > > > > to section 2. it happens so my application does not receive those > > > > > messages > > > > > > not even once (as i said, those messages are uncommitted) > > > > > > > > > > > > My guess is that it has something to do with the stream's > cache... > > I > > > > > would > > > > > > very like to have an explanation or even a solution > > > > > > > > > > > > I'm turning to you as a last resort, after long weeks of research > > and > > > > > > experiments > > > > > > > > > > > > Thanks alot > > > > > > > > > > > > > > > > > > > > >