Re: A question about kafka streams API
Hi Adam and John, thank you for your effort! We are implementing full idem-potency in our projects so that's nothing to worry about. As to what John said - we only have one partition, I personally assured that. So as i wrote in section 2. of my first message in this conversation - my stream should have processed the "asd" message again because it is not committed yet. That's why i suspect it has something to do with the stream's cache; maybe something like: 1. "asd" got processed and restored in cache 2. "{}" got processed and cached too. 3. commit interval makes the stream commit the offset of "{}" B.t.w If you want to run my application you should: 1. open it in some java editor as maven project 2. run it as a normal java application 3. setup kafka server & zookeeper on localhost 4. then you can send the above messages via cli John - even if you send "asd1", "asd2", "asd3" you will see in the logs that my app takes the latest each time Of course that's far beyond what i can ask from you guys to do, thanks a lot for your help. On Wed, Sep 12, 2018 at 8:14 PM John Roesler wrote: > Hi! > > As Adam said, if you throw an exception during processing, it should cause > Streams to shut itself down and *not* commit that message. Therefore, when > you start up again, it should again attempt to process that same message > (and shut down again). > > Within a single partition, messages are processed in order, so a bad > message will block the queue, and you should not see subsequent messages > get processed. > > However, if your later message "{}" goes to a different partition than the > bad message, then there's no relationship between them, and the later, > good, message might get processed. > > Does that help? > -John > > On Wed, Sep 12, 2018 at 8:38 AM Adam Bellemare > wrote: > > > Hi Yui Yoi > > > > > > Keep in mind that Kafka Consumers don't traditionally request only a > single > > message at a time, but instead requests them in batches. This allows for > > much higher throughput, but does result in the scenario of > "at-least-once" > > processing. Generally what will happen in this scenario is the following: > > > > 1) Client requests the next set of messages from offset (t). For example, > > assume it gets 10 messages and message 6 is "bad". > > 2) The client's processor will then process the messages one at a time. > > Note that the offsets are not committed after the message is processed, > but > > only at the end of the batch. > > 3) The bad message it hit by the processor. At this point you can decide > to > > skip the message, throw an exception, etc. > > 4a) If you decide to skip the message, processing will continue. Once all > > 10 messages are processed, the new offset (t+10) offset is committed back > > to Kafka. > > 4b) If you decide to throw an exception and terminate your app, you will > > have still processed the messages that came before the bad message. > Because > > the offset (t+10) is not committed, the next time you start the app it > will > > consume from offset t, and those messages will be processed again. This > is > > "at-least-once" processing. > > > > > > Now, if you need exactly-once processing, you have two choices - > > 1) Use Kafka Streams with exactly-once semantics (though, as I am not > > familiar with your framework, it may support it as well). > > 2) Use idempotent practices (ie: it doesn't matter if the same messages > get > > processed more than once). > > > > > > Hope this helps - > > > > Adam > > > > > > On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi wrote: > > > > > Hi Adam, > > > Thanks a lot for the rapid response, it did helped! > > > > > > Let me though ask one more simple question: Can I make a stream > > application > > > stuck on an invalid message? and not consuming any further messages? > > > > > > Thanks again > > > > > > On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare < > adam.bellem...@gmail.com > > > > > > wrote: > > > > > > > Hi Yui Yoi > > > > > > > > Preface: I am not familiar with the spring framework. > > > > > > > > "Earliest" when it comes to consuming from Kafka means, "Start > reading > > > from > > > > the first message in the topic, *if there is no offset stored for > that > > > > consumer group*". It sounds like you are expecting it to r
Re: A question about kafka streams API
Hi Adam, Thanks a lot for the rapid response, it did helped! Let me though ask one more simple question: Can I make a stream application stuck on an invalid message? and not consuming any further messages? Thanks again On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare wrote: > Hi Yui Yoi > > Preface: I am not familiar with the spring framework. > > "Earliest" when it comes to consuming from Kafka means, "Start reading from > the first message in the topic, *if there is no offset stored for that > consumer group*". It sounds like you are expecting it to re-read each > message whenever a new message comes in. This is not going to happen, as > there will be a committed offset and "earliest" will no longer be used. If > you were to use "latest" instead, if a consumer is started that does not > have a valid offset, it would use the very latest message in the topic as > the starting offset for message consumption. > > Now, if you are using the same consumer group each time you run the > application (which it seems is true, as you have "test-group" hardwired in > your application.yml), but you do not tear down your local cluster and > clear out its state, you will indeed see the behaviour you describe. > Remember that Kafka is durable, and maintains the offsets when the > individual applications go away. So you are probably seeing this: > > 1) start application instance 1. It realizes it has no offset when it tries > to register as a consumer on the input topic, so it creates a new consumer > entry for "earliest" for your consumer group. > 2) send message "asd" > 3) application instance 1 receives "asd", processes it, and updates the > offset (offset head = 1) > 4) Terminate instance 1 > 5) Start application instance 2. It detects correctly that consumer group > "test-group" is available and reads that offset as its starting point. > 6) send message "{}" > 7) application instance 2 receives "{}", processes it, and updates the > offset (offset head = 2) > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is > telling the Kafka cluster that it belongs to the same consumer group as > application 1. > > Hope this helps, > > Adam > > > > > > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi wrote: > > > TL;DR: > > my streams application skips uncommitted messages > > > > Hello, > > I'm using streams API via spring framework and experiencing a weird > > behavior which I would like to get an explanation to: > > First of all: The attached zip is my test project, I used kafka cli to > run > > a localhost broker and zookeeper > > > > what is happening is as follows: > > 1. I send an invalid message, such as "asd", and my consumer has a lag > and > > error message as expected > > 2. I send a valid message such as "{}", but instead of rereading the > first > > message as expected from an "earliest" configured application - my > > application reads the latest message, commits it and ignoring the one in > > error, thus i have no lag! > > 3. When I'm running my application when there are uncommitted messages - > > my application reads the FIRST not committed message, as if it IS an > > "earliest" configured application! > > > > In your documentation you assure "at least once" behavior, but according > > to section 2. it happens so my application does not receive those > messages > > not even once (as i said, those messages are uncommitted) > > > > My guess is that it has something to do with the stream's cache... I > would > > very like to have an explanation or even a solution > > > > I'm turning to you as a last resort, after long weeks of research and > > experiments > > > > Thanks alot > > >
A question about kafka streams API
TL;DR: my streams application skips uncommitted messages Hello, I'm using streams API via spring framework and experiencing a weird behavior which I would like to get an explanation to: First of all: The attached zip is my test project, I used kafka cli to run a localhost broker and zookeeper what is happening is as follows: 1. I send an invalid message, such as "asd", and my consumer has a lag and error message as expected 2. I send a valid message such as "{}", but instead of rereading the first message as expected from an "earliest" configured application - my application reads the latest message, commits it and ignoring the one in error, thus i have no lag! 3. When I'm running my application when there are uncommitted messages - my application reads the FIRST not committed message, as if it IS an "earliest" configured application! In your documentation you assure "at least once" behavior, but according to section 2. it happens so my application does not receive those messages not even once (as i said, those messages are uncommitted) My guess is that it has something to do with the stream's cache... I would very like to have an explanation or even a solution I'm turning to you as a last resort, after long weeks of research and experiments Thanks alot <>