Hi Yui Yoi

Keep in mind that Kafka Consumers don't traditionally request only a single
message at a time, but instead requests them in batches. This allows for
much higher throughput, but does result in the scenario of "at-least-once"
processing. Generally what will happen in this scenario is the following:

1) Client requests the next set of messages from offset (t). For example,
assume it gets 10 messages and message 6 is "bad".
2) The client's processor will then process the messages one at a time.
Note that the offsets are not committed after the message is processed, but
only at the end of the batch.
3) The bad message it hit by the processor. At this point you can decide to
skip the message, throw an exception, etc.
4a) If you decide to skip the message, processing will continue. Once all
10 messages are processed, the new offset (t+10) offset is committed back
to Kafka.
4b) If you decide to throw an exception and terminate your app, you will
have still processed the messages that came before the bad message. Because
the offset (t+10) is not committed, the next time you start the app it will
consume from offset t, and those messages will be processed again. This is
"at-least-once" processing.


Now, if you need exactly-once processing, you have two choices -
1) Use Kafka Streams with exactly-once semantics (though, as I am not
familiar with your framework, it may support it as well).
2) Use idempotent practices (ie: it doesn't matter if the same messages get
processed more than once).


Hope this helps -

Adam


On Wed, Sep 12, 2018 at 7:59 AM, Yui Yoi <shalosh...@gmail.com> wrote:

> Hi Adam,
> Thanks a lot for the rapid response, it did helped!
>
> Let me though ask one more simple question: Can I make a stream application
> stuck on an invalid message? and not consuming any further messages?
>
> Thanks again
>
> On Wed, Sep 12, 2018 at 2:35 PM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > Hi Yui Yoi
> >
> > Preface: I am not familiar with the spring framework.
> >
> > "Earliest" when it comes to consuming from Kafka means, "Start reading
> from
> > the first message in the topic, *if there is no offset stored for that
> > consumer group*". It sounds like you are expecting it to re-read each
> > message whenever a new message comes in. This is not going to happen, as
> > there will be a committed offset and "earliest" will no longer be used.
> If
> > you were to use "latest" instead, if a consumer is started that does not
> > have a valid offset, it would use the very latest message in the topic as
> > the starting offset for message consumption.
> >
> > Now, if you are using the same consumer group each time you run the
> > application (which it seems is true, as you have "test-group" hardwired
> in
> > your application.yml), but you do not tear down your local cluster and
> > clear out its state, you will indeed see the behaviour you describe.
> > Remember that Kafka is durable, and maintains the offsets when the
> > individual applications go away. So you are probably seeing this:
> >
> > 1) start application instance 1. It realizes it has no offset when it
> tries
> > to register as a consumer on the input topic, so it creates a new
> consumer
> > entry for "earliest" for your consumer group.
> > 2) send message "asd"
> > 3) application instance 1 receives "asd", processes it, and updates the
> > offset (offset head = 1)
> > 4) Terminate instance 1
> > 5) Start application instance 2. It detects correctly that consumer group
> > "test-group" is available and reads that offset as its starting point.
> > 6) send message "{}"
> > 7) application instance 2 receives "{}", processes it, and updates the
> > offset (offset head = 2)
> > *NOTE:* App instance 2 NEVER received "asd", nor should it, as it is
> > telling the Kafka cluster that it belongs to the same consumer group as
> > application 1.
> >
> > Hope this helps,
> >
> > Adam
> >
> >
> >
> >
> >
> > On Wed, Sep 12, 2018 at 6:57 AM, Yui Yoi <shalosh...@gmail.com> wrote:
> >
> > > TL;DR:
> > > my streams application skips uncommitted messages
> > >
> > > Hello,
> > > I'm using streams API via spring framework and experiencing a weird
> > > behavior which I would like to get an explanation to:
> > > First of all: The attached zip is my test project, I used kafka cli to
> > run
> > > a localhost broker and zookeeper
> > >
> > > what is happening is as follows:
> > > 1. I send an invalid message, such as "asd", and my consumer has a lag
> > and
> > > error message as expected
> > > 2. I send a valid message such as "{}", but instead of rereading the
> > first
> > > message as expected from an "earliest" configured application - my
> > > application reads the latest message, commits it and ignoring the one
> in
> > > error, thus i have no lag!
> > > 3. When I'm running my application when there are uncommitted messages
> -
> > > my application reads the FIRST not committed message, as if it IS an
> > > "earliest" configured application!
> > >
> > > In your documentation you assure "at least once" behavior, but
> according
> > > to section 2. it happens so my application does not receive those
> > messages
> > > not even once (as i said, those messages are uncommitted)
> > >
> > > My guess is that it has something to do with the stream's cache... I
> > would
> > > very like to have an explanation or even a solution
> > >
> > > I'm turning to you as a last resort, after long weeks of research and
> > > experiments
> > >
> > > Thanks alot
> > >
> >
>

Reply via email to