Fwd: Kafka Streams Commits

2021-04-20 Thread Geetish Nayak
Hello,

I have a query with respect to *commits* in Kafka Streams. From what I
understand,* commits to the broker are done by default every 30 seconds* by
Kafka Streams and hence offset gets increased as a result. We have been
seeing that these are done only for a short period of time(only during the
first few minutes after startup) and after which the offsets do not
increase at all*[commits.png]. *If you see the graph you can see that *Message
Consume Per Minute *suddenly drops since commits are not being processed
and the offsets do not increase.

We see that processing is happening in the consumers(in the final step we
persist data in our database) and we have metrics that show that records
are being read from the topics*[consumptionByTopic-min.png].*

Can you help know as per why commits are not being processed by brokers
even though consumers are consuming the records and would be sending
commits to the broker at default interval of 30 sec?

Thanks and Regards,
Geetish


Re: Kafka Stream: State replication seems unpredictable.

2021-04-20 Thread mangat rai
Hey Guozhang,

Thanks for creating the issue. Yes, you are right, this will happen only
with the consecutive rebalancing as after some time zombie thread will stop
and re-join the group and the new thread will always overwrite the state
with the latest data. In our poor infra setup, the rebalancing was
happening many times in a row.

Now, we can't guarantee that the consecutive rebalancing will not happen
again (we reduced fetch-size which fixed it in many ways), will any of the
following work as a workaround?

1. Use persistent store instead of in-memory. The new thread will never get
the lock hence we will lose availability but keep things consistent.
2. Use exactly-once semantics. However, we might need to redesign our apps.
It's a bigger change.

Regards,
Mangat

On Tue, Apr 20, 2021 at 6:50 AM Guozhang Wang  wrote:

> Hello Mangat,
>
> What you've encountered is a "zombie writer" issue, that is, Thread-2 did
> not know there's already a new rebalance and hence its partitions have been
> migrated out, until it tries to commit and then got notified of the
> illegal-generation error and realize itself is the "zombie" already. This
> case would still persist even with incremental rebalancing.
>
> I've filed https://issues.apache.org/jira/browse/KAFKA-12693 to summarize
> the situation. Please LMK if that explanation is clear to you.
>
> On Mon, Apr 19, 2021 at 12:58 AM mangat rai  wrote:
>
> > Thanks, Guozhang,
> >
> > I was knocking myself with Kafka's various consumer rebalancing
> algorithms
> > in the last 2 days. Could I generalize this problem as
> >
> >
> >
> > *Any in-memory state store backed by a changelog topic will always risk
> > having interleaved writes from two different writers during rebalancing?*
> > In our case, CPU throttling made it worse as thread-2 didn't try to
> commit
> > for a long time. Also,
> >
> > 1. Do you think if we disable the incremental rebalancing, we will not
> have
> > this issue because If I understood correctly Thread-4 will not start
> > processing until the state is completely transferred from Thread-2.
> > 2. If yes, how can we disable it without downgrading the client?
> >
> > Since we have a very low scale and no real-time computing requirement, we
> > will be happy to sacrifice the availability to have consistency.
> >
> > Regards,
> > Mangat
> >
> >
> >
> > On Sat, Apr 17, 2021 at 12:27 AM Guozhang Wang 
> wrote:
> >
> > > Hi Mangat:
> > >
> > > I think I found the issue of your problem here.
> > >
> > > It seems thread-2's partition was assigned to thread-4 while thread-2
> was
> > > not aware (because it missed a rebalance, this is normal scenario); in
> > > other words, thread2 becomes a "zombie". It would stay in that zombie
> > state
> > > until it tried to commit, in which it would get an error from the
> brokers
> > > and realize its zombie identity and re-joins the group.
> > >
> > > During that period of time, before the commit was issued, it would
> > continue
> > > trying to write to its local states; here are several scenarios:
> > >
> > > 1) if thread-2/4 are belonging to two different nodes then that is
> fine,
> > > since they will write to different local state stores.
> > > 2) if they belong to the same nodes, and
> > >a) the state stores are persistent then they would have risks of
> > > contention; this is guarded by the state directory locks (as file
> locks)
> > in
> > > which case the new owner thread-4 should not be able to get on the
> local
> > > state files.
> > >b) the state stores are in-memory, in which case that is fine since
> > the
> > > in-memory stores are kept separate as well.
> > >
> > > In your case: 2.b), the issue is that the changelog would still be
> shared
> > > between the two --- but note that this is the same case as in case 1)
> as
> > > well. And this means at that time the changelog is shared by two
> writers
> > > sending records interleaving. And if there’s a tombstone that was
> > intended
> > > for a record A, but when it was written interleaving and there’s
> another
> > > record B in between, that tombstone would effectively delete record B.
> > The
> > > key here is that, when we replay the changelogs, we replay it
> completely
> > > following offset ordering.
> > >
> > >
> > >
> > > On Thu, Apr 15, 2021 at 2:28 AM mangat rai 
> wrote:
> > >
> > > > Guozhang,
> > > >
> > > > Yes, you are correct. We have our own group processor. I have more
> > > > information now.
> > > >
> > > > 1. I added ThreadId in the data when the app persists into the
> > changelog
> > > > topic.
> > > > 2. Thread-2 which was working with partition-0 had a timeout issue.
> > > > 4. Thread-4 picked up this partition-0 as I can see its Id in the
> > > > changelog.
> > > > 5. *But then Thread-2 and Thread-4 both were writing into the
> > partition-0
> > > > of the changelog, that too for the same key.*
> > > >
> > > > So I was clearly able to see that two threads were overwriting data
> of
> > > one
> > > > another into t

Fwd: Kafka Streams Commits

2021-04-20 Thread Geetish Nayak
Hello,

I have a query with respect to *commits* in Kafka Streams. From what I
understand,* commits to the broker are done by default every 30 seconds* by
Kafka Streams and hence offset gets increased as a result. We have been
seeing that these are done only for a short period of time(only during the
first few minutes after startup) and after which the offsets do not
increase at all*[commits.png]. *If you see the graph you can see that *Message
Consume Per Minute *suddenly drops since commits are not being processed
and the offsets do not increase.

We see that processing is happening in the consumers(in the final step we
persist data in our database) and we have metrics that show that records
are being read from the topics*[consumptionByTopic-min.png].*

Can you help know as per why commits are not being processed by brokers
even though consumers are consuming the records and would be sending
commits to the broker at default interval of 30 sec?

Thanks and Regards,
Geetish


Apache Kafka Streams : Out-of-Order messages & uses of TimeStamp extractor

2021-04-20 Thread Neeraj Vaidya
Hi,
I have asked this on StackOverflow, but will ask it here as well.

I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have a 
Kafka streams application which consumes from TA and writes to topic-B (TB). In 
the streams application, I have a custom timestamp extractor which extracts the 
timestamp from the message payload.

For one of my failure handling test cases, I shutdown the Kafka cluster while 
my applications are running.

When the producer application tries to write messages to TA, it cannot because 
the cluster is down and hence (I assume) buffers the messages. Let's say it 
receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is first and 
m4 is last).

When I bring the Kafka cluster back online, the producer sends the buffered 
messages to the topic, but they are not in order. I receive for example, m2 
then m3 then m1 and then m4.

Why is that ? Is it because the buffering in the producer is multi-threaded 
with each producing to the topic at the same time ?

I assumed that the custom timestamp extractor would help in ordering messages 
when consuming them. But they do not. Or maybe my understanding of the 
timestamp extractor is wrong.

If not, then what are the specific uses of the timestamp extractor ? Just to 
associate a timestamp with an event ?

I got one solution from SO here, to just stream all events from tA to another 
intermediate topic (say tA') which will use the TimeStamp extractor to another 
topic. But I am not sure if this will cause the events to get reordered based 
on the extracted timestamp.

Regards,
Neeraj


Re: Apache Kafka Streams : Out-of-Order messages & uses of TimeStamp extractor

2021-04-20 Thread Matthias J. Sax
Replied on StackOverflow:
https://stackoverflow.com/questions/67158317/apache-kafka-streams-out-of-order-messages


-Matthias



On 4/20/21 4:21 PM, Neeraj Vaidya wrote:
> Hi,
> I have asked this on StackOverflow, but will ask it here as well.
> 
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have 
> a Kafka streams application which consumes from TA and writes to topic-B 
> (TB). In the streams application, I have a custom timestamp extractor which 
> extracts the timestamp from the message payload.
> 
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running.
> 
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> 
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> 
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> 
> I assumed that the custom timestamp extractor would help in ordering messages 
> when consuming them. But they do not. Or maybe my understanding of the 
> timestamp extractor is wrong.
> 
> If not, then what are the specific uses of the timestamp extractor ? Just to 
> associate a timestamp with an event ?
> 
> I got one solution from SO here, to just stream all events from tA to another 
> intermediate topic (say tA') which will use the TimeStamp extractor to 
> another topic. But I am not sure if this will cause the events to get 
> reordered based on the extracted timestamp.
> 
> Regards,
> Neeraj
>