cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1609353060
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1609353060
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1609353060
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1609312616
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608934073
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608897877
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608875582
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -329,6 +334,17 @@ public void
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608870767
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -329,6 +334,17 @@ public void
kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2123163240
@lianetm @cadonna—The latest batch of feedback has been addressed. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608732398
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1848,6 +1821,40 @@ private void subscribeInternal(Collection
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608730407
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608614638
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608607604
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1295,21 +1268,21 @@ private void close(Duration timeout, boolean
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608607062
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1848,6 +1821,40 @@ private void subscribeInternal(Collection
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608595900
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608584519
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -281,64 +276,15 @@ void testEnsureMetadataUpdateOnPoll() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608542683
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608542270
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608541856
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
Review Comment:
Resolving this thread as there have been
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608541319
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -144,6 +150,36 @@ void runOnce() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608540876
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608539565
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608534184
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1835,7 +1849,31 @@ public void
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608516652
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1848,6 +1821,40 @@ private void subscribeInternal(Collection
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608516652
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1848,6 +1821,40 @@ private void subscribeInternal(Collection
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608487528
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608453186
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1295,21 +1268,21 @@ private void close(Duration timeout, boolean
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608418633
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -329,6 +334,17 @@ public void
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608418633
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -329,6 +334,17 @@ public void
cadonna commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2122575967
> Hey @cadonna, the tricky bit is that, for some events, the request
managers do expire requests too, so in this flow you described:
>
> > The event is processed in the
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1608024280
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1835,7 +1849,31 @@ public void
kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2118579935
@lianetm @cadonna—I believe I have addressed all the actionable feedback.
Are there additional concerns about this PR that prevent it from being merged?
Thanks.
--
This is an
kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2118578191
> Hey @cadonna, the tricky bit is that, for some events, the request
managers do expire requests too, so in this flow you described:
>
> > The event is processed in the
lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2117763853
Hey @cadonna, the tricky bit is that, for some events, the request managers
do expire requests too, so in this flow you described:
> The event is processed in the
cadonna commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2117012544
> High level comment, just to clarify and make sure it's something we are
considering and will cover with the follow-up PRs for timeouts. Here we're
introducing a component to ensures
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1604239000
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603994499
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1846,7 +1849,34 @@ public void
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603994190
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest {
lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2115938603
High level comment, just to clarify and make sure it's something we are
considering and will cover with the follow-up PRs for timeouts. Here we're
introducing a component to ensures that
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603740393
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1603682545
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1602382027
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1602381867
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -88,6 +91,7 @@ public class ConsumerNetworkThreadTest {
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1601414540
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -150,6 +151,7 @@ public class AsyncKafkaConsumerTest {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1600781735
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
Review Comment:
I added a test to `AsyncKafkaConsumerTest`.
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1600781636
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
Review Comment:
Oh, the code definitely has smells!
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599699017
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
Review Comment:
See my comment above. I am sure you will find
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599698758
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
Review Comment:
Usually if it takes some contortion to
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599266953
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599244816
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
Review Comment:
I will try my best to add tests to
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599244057
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
Review Comment:
I will add tests for the
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599243592
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599243426
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599243250
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599236609
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599235694
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599233659
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599232355
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java:
##
@@ -32,13 +29,9 @@ public abstract class
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1599232056
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -144,6 +150,36 @@ void runOnce() {
cadonna commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2092808466
> Is there any problem if we leave
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks` as is? It clearly needs the
timer
Yeah, I think that we should leave this as it is for now.
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1589007924
##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache
lucasbru commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2092616162
Is there any problem if we leave
`awaitPendingAsyncCommitsAndExecuteCommitCallbacks`? It clearly needs the timer
--
This is an automated message from the Apache Git Service.
To
cadonna commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2090837554
> > > > > Here I have a comment, I could not put at the right location in
the code:
> > > > > On line 1362, in commitSync() the consumer waits on the
commitFuture with a timer. I
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1587841410
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
cadonna commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2090517636
> > > > Here I have a comment, I could not put at the right location in the
code:
> > > > On line 1362, in commitSync() the consumer waits on the commitFuture
with a timer. I think,
cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1587519244
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2089079055
> > > Here I have a comment, I could not put at the right location in the
code:
> > > On line 1362, in commitSync() the consumer waits on the commitFuture
with a timer. I think, it
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586632167
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1169,8 +1129,7 @@ private Map
beginningOrEndOffset(Collection
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586626516
##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -987,6 +987,7 @@ public void testResetUsingAutoResetPolicy(GroupProtocol
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586622390
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1273,6 +1228,22 @@ private void close(Duration timeout, boolean
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586620323
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java:
##
@@ -16,9 +16,118 @@
*/
package
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586620323
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java:
##
@@ -16,9 +16,118 @@
*/
package
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586607598
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##
@@ -30,12 +29,7 @@ public abstract class CommitEvent extends
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586603709
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586559943
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586557142
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586557142
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586542049
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586539952
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586537358
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586524076
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1586499310
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
philipnee commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585558712
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##
@@ -30,12 +29,7 @@ public abstract class CommitEvent extends
philipnee commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585179214
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1273,6 +1228,22 @@ private void close(Duration timeout, boolean
lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2087359722
Hey @kirktrue , thanks a lot for the PR, this is a big piece! I completed a
pass of all the non-test files, left some comments.
--
This is an automated message from the Apache Git
lianetm commented on PR #15640:
URL: https://github.com/apache/kafka/pull/15640#issuecomment-2087355371
> > Here I have a comment, I could not put at the right location in the code:
> >
> > On line 1362, in commitSync() the consumer waits on the commitFuture
with a timer. I think, it
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585466853
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java:
##
@@ -16,9 +16,118 @@
*/
package
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585435927
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##
@@ -30,12 +29,7 @@ public abstract class CommitEvent extends
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585390985
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585390985
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -273,9 +310,18 @@ void cleanup() {
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585152310
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585035057
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585091606
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585083395
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1892,13 +1897,13 @@ private void subscribeInternal(Collection
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585035057
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1853,6 +1824,40 @@ private void subscribeInternal(Collection
lianetm commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1585004533
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software
1 - 100 of 129 matches
Mail list logo