Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-23 Thread via GitHub
cadonna merged PR #14746: URL: https://github.com/apache/kafka/pull/14746 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-23 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1824090907 Build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1397156239 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) { } } +

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1397154783 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable { pri

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub
lucasbru commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1397061812 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable { pr

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816006382 > Do you think it is worth adding a ticket about using the BackgroundEventQueue for the fetches? Let me know if you have a concrete idea about how to implement this. Let me think a

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-17 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816005251 > I wonder if we could just use a BlockingQueue for the fetchBuffer because fetchBuffer.poll(time) blocks until non-empty or timeout. As far as I see from the javadocs of a `Blockin

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-16 Thread via GitHub
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1815756879 @cadonna - There are some build failures. Can we rerun the tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-16 Thread via GitHub
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1815755117 Hi @cadonna - I was about to reply with the same idea you proposed. I think that would work. I wonder if we could just use a `BlockingQueue` for the fetchBuffer because fetchBuffer.poll

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-16 Thread via GitHub
kirktrue commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1815469269 Can we add the `ctr` label here for easier discovery? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL abo

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-16 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814405730 Thanks for the options, @lucasbru! In the meanwhile, I also had an idea to store the reference to the application thread in a wakeupable task similar to your `FetchAction` task. A

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-16 Thread via GitHub
lucasbru commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814363083 > @philipnee @kirktrue @lucasbru I am concerned that when one calls `wakeup()` the application thread might stay blocked in waiting for a non-empty fetch buffer. Yes. The applica

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-16 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814256948 @philipnee @kirktrue @lucasbru I am concerned that when one calls `wakeup()` the application thread might stay blocked in waiting for a non-empty fetch buffer. See https://github.com

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-16 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1814074041 > Is it possible to test poll would return normally after the consumer is woken up and invoke poll That is a good idea! -- This is an automated message from the Apache Git Servi

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1813035808 hi @cadonna - I think we probably don't have to worry about it here. I have one comment: Is it possible to test poll would return normally after the consumer is woken up and invoke pol

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1813027684 I am still not sure what the conclusion is regarding: https://github.com/apache/kafka/pull/14746#issuecomment-1809158196 -- This is an automated message from the Apache Git Service. To

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1394572019 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeout

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1394571238 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeout

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1394568452 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -168,6 +178,44 @@ public void testCommitted_ExceptionThrown() {

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
philipnee commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1394496768 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeo

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
lucasbru commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1394418301 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeou

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
lucasbru commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1394385232 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -415,6 +439,27 @@ public ConsumerRecords poll(final Duration timeou

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-15 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1812010386 @philipnee @kirktrue Thanks for your comments and explanation! I totally missed that if the future is completed exceptionally it throws an `ExecutionException`. The javadocs of `completeE

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-14 Thread via GitHub
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810853645 Basically future.get() API only return 3 types of exceptions: ExecutionException, InterruptedException, and Cancellation per documentation. -- This is an automated message from the Ap

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-14 Thread via GitHub
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810851585 Hi @cadonna - When the consumer is woken up. The WakeupTrigger should complete the future exceptionally with WakeupException. To rethrow that exception during future.get(), you will n

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-14 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810259619 @kirktrue I am not sure, I understand your comment about `ExecutionException`. The future is not bound to any task, so can an `ExecutionException` ever be thrown? -- This is an automat

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-13 Thread via GitHub
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809158196 Another point I want to make here is that the wakeup call also wakes-up the blocking client. I wonder if we also need to do that to the network thread - @kirktrue -- This is an auto

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-13 Thread via GitHub
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1809152957 Hi @cadonna - Thank you for putting time into this PR. Based on my understanding this PR does 2 things: if wakeup() is invoked before calling poll(), the consumer will return immediate

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-13 Thread via GitHub
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1807924858 I will add tests after somebody confirms that this makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use th

Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-13 Thread via GitHub
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1390948525 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -335,10 +336,17 @@ public ConsumerRecords poll(final Duration ti

[PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]

2023-11-13 Thread via GitHub
cadonna opened a new pull request, #14746: URL: https://github.com/apache/kafka/pull/14746 We need to be careful when aborting a long poll with wakeup() since the consumer might never return records if the poll is interrupted after the consumer position has been updated but the records have