How does Kafka Consumer send JoinRequest?

2023-11-26 Thread Debraj Manna
Can someone let me know if the JoinRequest is sent by the consumer from the
polling/user thread or from the background heart-beat thread?

If JoinRequest is being sent from the polling/user thread then in this case
if the poll user thread takes more than max.poll.interval.secs then the
consumer will remain disconnected from the broker for that long. For
example, if max.poll.interval.secs is 300 sec and if processing in the poll
thread takes 15 mins then for 15 mins the partition from which this
consumer was polling will remain idle and no message will be consumed from
that partition. Is my understanding correct?

I am using Kafka client 3.5.1 with Apache Kafka broker 2.8.1 with all
default settings on the consumer configs.


Re: How does Kafka Consumer send JoinRequest?

2023-11-26 Thread Haruki Okada
Hi.

JoinGroup request is sent from the polling/user thread.
In your example, the consumer instance will be removed from the group
because it didn't join the group within the timeout.
So the partition will be assigned to another consumer and be processed.

2023年11月26日(日) 18:09 Debraj Manna :

> Can someone let me know if the JoinRequest is sent by the consumer from the
> polling/user thread or from the background heart-beat thread?
>
> If JoinRequest is being sent from the polling/user thread then in this case
> if the poll user thread takes more than max.poll.interval.secs then the
> consumer will remain disconnected from the broker for that long. For
> example, if max.poll.interval.secs is 300 sec and if processing in the poll
> thread takes 15 mins then for 15 mins the partition from which this
> consumer was polling will remain idle and no message will be consumed from
> that partition. Is my understanding correct?
>
> I am using Kafka client 3.5.1 with Apache Kafka broker 2.8.1 with all
> default settings on the consumer configs.
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: [VOTE] 3.6.1 RC0

2023-11-26 Thread Jakub Scholz
+1 non-binding. I used the staged Scala 2.13 artifacts and the staged Maven
repo for my tests. All seems to work fine.

Thanks
Jakub

On Fri, Nov 24, 2023 at 4:37 PM Mickael Maison  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for release of Apache Kafka 3.6.1.
>
> This is a bugfix release with several fixes, including dependency
> version bumps for CVEs.
>
> Release notes for the 3.6.1 release:
> https://home.apache.org/~mimaison/kafka-3.6.1-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Friday, December 1
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~mimaison/kafka-3.6.1-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~mimaison/kafka-3.6.1-rc0/javadoc/
>
> * Tag to be voted upon (off 3.6 branch) is the 3.6.1 tag:
> https://github.com/apache/kafka/releases/tag/3.6.1-rc0
>
> PR for updating docs:
> https://github.com/apache/kafka-site/pull/568
>
> * Documentation:
> https://kafka.apache.org/36/documentation.html
>
> * Protocol:
> https://kafka.apache.org/36/protocol.html
>
> * Successful Jenkins builds for the 3.6 branch:
> Unit/integration tests: We still have a lot of flaky tests in the 3.6
> branch. Looking at the last few 3.6 builds in
> https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.6/ it seems all
> tests passed at least once apart from
> ClusterConnectionStatesTest.testSingleIP(). There's
> https://issues.apache.org/jira/browse/KAFKA-15762 to fix that test.
> System tests: Still running I'll post an update once they complete.
>
> Thanks,
> Mickael
>


Re: GlobalKTable with RocksDB - queries before state RUNNING?

2023-11-26 Thread Sophie Blee-Goldman
Ah, yeah, IQ v2 was a pretty big feature so it hasn't yet been implemented
across all parts of Kafka Streams. You'll notice that we're still actively
putting out new KIPs trying to complete this feature. I don't think there's
any particular reason that Global KTables can't be made to work with IQ v2,
but no one's gotten around to doing it yet. I did a quick search and
couldn't even find a JIRA ticket for this, so it doesn't seem to be on
anyone's radar.

If this is something you'd like to see implemented, go ahead and file a
ticket for it on JIRA
. Honestly it may
have been overlooked completely. Sometimes the global table stuff can get
overshadowed since it's relatively less common. So definitely go ahead and
file a ticket to hopefully kick off the conversation.

I'm not saying it'll get picked up right away, but if there's no ticket and
no one asking for it, then it might never happen at all. Of course, we
accept KIPs -- if you really need this feature, or if you've ever been
interested in contributing to Apache Kafka/Kafka Streams, consider
implementing it yourself!

On Wed, Nov 22, 2023 at 12:26 AM Christian Zuegner
 wrote:

> Hi Sophie,
>
> thanks a lot for you tip! I've implemented a StateListener - to block
> queries when the state does not equal RUNNING. This will work perfectly now
> for our use-case!
>
>
> In the meantime I noticed the InteractiveQuery API v2 and give it a try.
> Unfortunately it seems not to cope with GlobalKTable. When try to run this:
>
> return
> streams.query(StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(key)));
>
> I got: "Global stores do not yet support the KafkaStreams#query API. Use
> KafkaStreams#store instead."
>
> From my point of view it would be great if this will work and behave like
> with IN_MEMORY StoreType as it is straight forward to use.
>
> Do you see a chance to get InteractiveQueryV2 work with GlobalKTable?
>
> Kind regards,
> Christian
>
> -Original Message-
> From: Sophie Blee-Goldman 
> Sent: Wednesday, November 22, 2023 1:51 AM
> To: christian.zueg...@ams-osram.com.invalid
> Cc: users@kafka.apache.org
> Subject: Re: GlobalKTable with RocksDB - queries before state RUNNING?
>
> [Sie erhalten nicht häufig E-Mails von sop...@responsive.dev. Weitere
> Informationen, warum dies wichtig ist, finden Sie unter
> https://aka.ms/LearnAboutSenderIdentification ]
>
> Just to make sure I understand the logs, you're saying the "new file
> processed" lines represent store queries, and presumably the
> com.osr.serKafkaStreamsService is your service that's issuing these queries?
>
> You need to wait for the app to finish restoring state before querying it.
> Based on this message -- "KafkaStreams has not been started, you can retry
> after calling start()" -- I assume you're kicking off the querying service
> right away and blocking queries until after KafkaStreams#start is called.
> But you need to wait for it to actually finish starting up, not just for
> start() to be called. The best way to do this is by setting a state
> listener via KafkaStreams#setStateListener, and then using this to listen
> in on the KafkaStreams.State and blocking the queries until the state has
> changed to RUNNING.
>
> In case you're curious about why this seems to work with in-memory stores
> but not with rocksdb, it seems like in the in-memory case, the queries that
> are attempted during restoration are blocked due to the store being closed
> (according to "(Quarkus Main Thread) the state store, store-name, is not
> open.")
>
> So why is the store closed for most of the restoration in the in-memory
> case only? This gets a bit into the weeds, but it has to do with the
> sequence of events in starting up a state store. When the global thread
> starts up, it'll first loop over all its state stores and call #init on
> them. Two things have to happen inside #init: the store is opened, and the
> store registers itself with the ProcessorContext. The #register involves
> various things, including a call to fetch the end offsets of the topic for
> global state stores. This is a blocking call, so the store might stay
> inside the #register call for a relatively long while.
>
> For RocksDB stores, we open the store first and then call #register, so by
> the time the GlobalStreamThread is sitting around waiting on the end
> offsets response, the store is open and your queries are getting through to
> it. However the in-memory store actually registers itself *first*, before
> marking itself as open, and so it remains closed for most of the time it
> spends in restoration and blocks any query attempts during this time.
>
> I suppose it would make sense to align the two store implementations to
> have the same behavior, and the in-memory store is probably technically
> more correct. But in the end you really should just wait for the
> KafkaStreams.State to get to RUNNING before querying the state store, as
> that's the