cadonna merged PR #14746:
URL: https://github.com/apache/kafka/pull/14746
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1824090907
Build failures are unrelated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific
cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1397156239
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) {
}
}
+
cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1397154783
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable {
pri
lucasbru commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1397061812
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java:
##
@@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable {
pr
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816006382
> Do you think it is worth adding a ticket about using the
BackgroundEventQueue for the fetches? Let me know if you have a concrete idea
about how to implement this.
Let me think a
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816005251
> I wonder if we could just use a BlockingQueue for the fetchBuffer because
fetchBuffer.poll(time) blocks until non-empty or timeout.
As far as I see from the javadocs of a `Blockin
philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1815756879
@cadonna - There are some build failures. Can we rerun the tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and
philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1815755117
Hi @cadonna - I was about to reply with the same idea you proposed. I think
that would work. I wonder if we could just use a `BlockingQueue` for the
fetchBuffer because fetchBuffer.poll
kirktrue commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1815469269
Can we add the `ctr` label here for easier discovery?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL abo
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814405730
Thanks for the options, @lucasbru!
In the meanwhile, I also had an idea to store the reference to the
application thread in a wakeupable task similar to your `FetchAction` task. A
lucasbru commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814363083
> @philipnee @kirktrue @lucasbru I am concerned that when one calls
`wakeup()` the application thread might stay blocked in waiting for a non-empty
fetch buffer.
Yes. The applica
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814256948
@philipnee @kirktrue @lucasbru I am concerned that when one calls `wakeup()`
the application thread might stay blocked in waiting for a non-empty fetch
buffer.
See
https://github.com
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814074041
> Is it possible to test poll would return normally after the consumer is
woken up and invoke poll
That is a good idea!
--
This is an automated message from the Apache Git Servi
philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1813035808
hi @cadonna - I think we probably don't have to worry about it here. I have
one comment: Is it possible to test poll would return normally after the
consumer is woken up and invoke pol
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1813027684
I am still not sure what the conclusion is regarding:
https://github.com/apache/kafka/pull/14746#issuecomment-1809158196
--
This is an automated message from the Apache Git Service.
To
cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1394572019
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeout
cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1394571238
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeout
cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1394568452
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -168,6 +178,44 @@ public void testCommitted_ExceptionThrown() {
philipnee commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1394496768
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeo
lucasbru commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1394418301
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeou
lucasbru commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1394385232
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeou
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1812010386
@philipnee @kirktrue Thanks for your comments and explanation! I totally
missed that if the future is completed exceptionally it throws an
`ExecutionException`. The javadocs of `completeE
philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810853645
Basically future.get() API only return 3 types of exceptions:
ExecutionException, InterruptedException, and Cancellation per documentation.
--
This is an automated message from the Ap
philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810851585
Hi @cadonna - When the consumer is woken up. The WakeupTrigger should
complete the future exceptionally with WakeupException. To rethrow that
exception during future.get(), you will n
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810259619
@kirktrue I am not sure, I understand your comment about
`ExecutionException`. The future is not bound to any task, so can an
`ExecutionException` ever be thrown?
--
This is an automat
philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809158196
Another point I want to make here is that the wakeup call also wakes-up the
blocking client. I wonder if we also need to do that to the network thread -
@kirktrue
--
This is an auto
philipnee commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809152957
Hi @cadonna - Thank you for putting time into this PR. Based on my
understanding this PR does 2 things: if wakeup() is invoked before calling
poll(), the consumer will return immediate
cadonna commented on PR #14746:
URL: https://github.com/apache/kafka/pull/14746#issuecomment-1807924858
I will add tests after somebody confirms that this makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use th
cadonna commented on code in PR #14746:
URL: https://github.com/apache/kafka/pull/14746#discussion_r1390948525
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -335,10 +336,17 @@ public ConsumerRecords poll(final Duration ti
cadonna opened a new pull request, #14746:
URL: https://github.com/apache/kafka/pull/14746
We need to be careful when aborting a long poll with wakeup() since the
consumer might never return records if the poll is interrupted after the
consumer position has been updated but the records have
31 matches
Mail list logo