Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2159551660

   Thanks for review @lucasbru 
   Thank you again for taking the time to review. @philipnee @lianetm @kirktrue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1634006443


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -199,4 +234,4 @@ public void prepareFindCoordinatorResponse(Errors error) {
 private Node mockNode() {
 return new Node(0, "localhost", 99);
 }
-}
+}

Review Comment:
   I got it.
   thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


lucasbru commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1633531270


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -199,4 +234,4 @@ public void prepareFindCoordinatorResponse(Errors error) {
 private Node mockNode() {
 return new Node(0, "localhost", 99);
 }
-}
+}

Review Comment:
   Yeah, I saw that as well, but given that we are so close to code freeze, I 
let it slide :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


lucasbru merged PR #16043:
URL: https://github.com/apache/kafka/pull/16043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-10 Thread via GitHub


kirktrue commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1633528798


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -199,4 +234,4 @@ public void prepareFindCoordinatorResponse(Errors error) {
 private Node mockNode() {
 return new Node(0, "localhost", 99);
 }
-}
+}

Review Comment:
   Super nitpick: avoid whitespace changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2155732666

   @philipnee & @lianetm Thanks for review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631473205


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +180,38 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+AuthenticationException authException = new 
AuthenticationException("Test Auth Exception");
+doThrow(authException).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+BackgroundEvent event = backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(BackgroundEvent.Type.ERROR, event.type());
+assertEquals(authException, ((ErrorEvent) event).error());

Review Comment:
   my bad, you got it (I was probably looking at an old commit?). All good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631469076


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +180,38 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+AuthenticationException authException = new 
AuthenticationException("Test Auth Exception");
+doThrow(authException).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+BackgroundEvent event = backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(BackgroundEvent.Type.ERROR, event.type());
+assertEquals(authException, ((ErrorEvent) event).error());

Review Comment:
   I don't understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631466015


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +180,38 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+AuthenticationException authException = new 
AuthenticationException("Test Auth Exception");
+doThrow(authException).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+BackgroundEvent event = backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(BackgroundEvent.Type.ERROR, event.type());
+assertEquals(authException, ((ErrorEvent) event).error());

Review Comment:
   I see you created the authException above but we're not using it here to 
simplify, do you plan to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2155187907

   Thanks for the changes @appchemist ! Could we update the PR description to:
   - remove the "... a new PR has been created" line. (the other PR is linked 
in many places throughout the discussion, we don't want it in the squashed 
commit msg
   - Extend the description mentioning that this fix ensures that metadata 
errors are thrown to the user on consumer.poll
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631458459


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -83,12 +84,14 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("classfanoutcomplexity")
 public class ConsumerNetworkThreadTest {
 
 private ConsumerTestBuilder testBuilder;
 private Time time;
 private ConsumerMetadata metadata;
 private NetworkClientDelegate networkClient;
+private BlockingQueue backgroundEventsQueue;

Review Comment:
   right, I missed that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631458459


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -83,12 +84,14 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("classfanoutcomplexity")
 public class ConsumerNetworkThreadTest {
 
 private ConsumerTestBuilder testBuilder;
 private Time time;
 private ConsumerMetadata metadata;
 private NetworkClientDelegate networkClient;
+private BlockingQueue backgroundEventsQueue;

Review Comment:
   right, I miss that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631455569


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -83,12 +84,14 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings("classfanoutcomplexity")
 public class ConsumerNetworkThreadTest {
 
 private ConsumerTestBuilder testBuilder;
 private Time time;
 private ConsumerMetadata metadata;
 private NetworkClientDelegate networkClient;
+private BlockingQueue backgroundEventsQueue;

Review Comment:
   seems we don't use this anymore right? And probably don't need the 
supresswarnings either?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631404977


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##
@@ -223,6 +225,27 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
 assertEquals(0, consumer.assignment.size())
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+// Invalid topic name due to space
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic

Review Comment:
   You mean to delete line 235, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631404977


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##
@@ -223,6 +225,27 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
 assertEquals(0, consumer.assignment.size())
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+// Invalid topic name due to space
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic

Review Comment:
   You mean to delete line 235, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631410054


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +184,63 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+String exMsg = "Test Auth Exception";
+doThrow(new 
AuthenticationException(exMsg)).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());
+}
+
+@Test
+public void testPropagateInvalidTopicMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+this.metadata = new Metadata(100, 100, 5,

Review Comment:
   yeah, we should just remove one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631404977


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##
@@ -223,6 +225,27 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
 assertEquals(0, consumer.assignment.size())
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+// Invalid topic name due to space
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic

Review Comment:
   You mean to delete line 234, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631404294


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +184,63 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+String exMsg = "Test Auth Exception";
+doThrow(new 
AuthenticationException(exMsg)).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());
+}
+
+@Test
+public void testPropagateInvalidTopicMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+this.metadata = new Metadata(100, 100, 5,

Review Comment:
   There is a suggestion to remove that test, what do you think?
   https://github.com/apache/kafka/pull/16043#discussion_r1631395106
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631397697


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##
@@ -223,6 +225,27 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
 assertEquals(0, consumer.assignment.size())
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+// Invalid topic name due to space
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic

Review Comment:
   nit: I would remove, the line below seems clear enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631396237


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +184,63 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+String exMsg = "Test Auth Exception";
+doThrow(new 
AuthenticationException(exMsg)).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());
+}
+
+@Test
+public void testPropagateInvalidTopicMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+this.metadata = new Metadata(100, 100, 5,

Review Comment:
   can we also use the mocked metadata as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631395106


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +184,63 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+String exMsg = "Test Auth Exception";
+doThrow(new 
AuthenticationException(exMsg)).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());
+}
+
+@Test
+public void testPropagateInvalidTopicMetadataError() {

Review Comment:
   this tests seems to be testing exactly the same as 
`testPropagateMetadataError`, but generating the error differently (error 
generation is logic that belongs to the Metadata class, not this one, that's 
why we end up having to create a metadata object and not a mock). 
   
   I could be missing something but seems to me we should remove this one, and 
just keep the one above that ensures that the error propagation we're 
responsible for here happens as expected (no matter how the error made it to 
the metadata object). Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631395106


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +184,63 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+String exMsg = "Test Auth Exception";
+doThrow(new 
AuthenticationException(exMsg)).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());
+}
+
+@Test
+public void testPropagateInvalidTopicMetadataError() {

Review Comment:
   this tests seems to be testing exactly the same as 
`testPropagateMetadataError`, but generating the error differently (error 
generation is logic that belongs to the Metadata class, not this one, that's 
why we end up having to create a metadata object and not a mock). I could be 
missing something but seems to me we should remove this one, and just keep the 
one above that ensures that the error propagation we're responsible for here 
happens as expected (no matter how the error made it to the metadata object). 
Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631394278


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -360,6 +367,18 @@ void testSendUnsentRequest() {
 assertFalse(networkClient.hasAnyPendingRequests());
 }
 
+@Test
+void testMetadataErrorEvent() {

Review Comment:
   for this test we are essentially testing if networkClientDelegate is enqueue 
an error event to the background event queue, so i don't think we need this 
test here since you've written tests in network client delegate and integration 
test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631389671


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +184,63 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+String exMsg = "Test Auth Exception";
+doThrow(new 
AuthenticationException(exMsg)).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+
+assertEquals(0, backgroundEventQueue.size());
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());

Review Comment:
   we could simplify to a single `assertEquals` for event.error() and the 
exception we mocked and expect here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631386757


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +184,63 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+String exMsg = "Test Auth Exception";
+doThrow(new 
AuthenticationException(exMsg)).when(metadata).maybeThrowAnyException();
+
+LinkedList backgroundEventQueue = new LinkedList<>();
+this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate();
+
+

Review Comment:
   nit: extra line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631385696


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -360,6 +367,18 @@ void testSendUnsentRequest() {
 assertFalse(networkClient.hasAnyPendingRequests());
 }
 
+@Test
+void testMetadataErrorEvent() {
+metadata.fatalError(new AuthenticationException("Authentication 
failed"));
+
+consumerNetworkThread.runOnce();
+BackgroundEvent event = backgroundEventsQueue.poll();
+assertNotNull(event);
+assertEquals(BackgroundEvent.Type.ERROR, event.type());
+assertEquals(AuthenticationException.class, ((ErrorEvent) 
event).error().getClass());
+assertEquals("Authentication failed", ((ErrorEvent) 
event).error().getMessage());

Review Comment:
   this could be simplified to a single `assertEquals(authException, 
((ErrorEvent) event).error())`, with authException declared above and passed to 
the fatalError on ln 372



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2155022679

   @philipnee & @lianetm Thanks for review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631337110


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,27 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{

Review Comment:
   great addition, but this test should better go in the 
`PlainTextConsumerSubscriptionTest` file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631337110


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,27 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{

Review Comment:
   great addition, but this test should go in the 
`PlainTextConsumerSubscriptionTest` file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631330338


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -192,15 +192,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
-val invalidTopicName = "topic abc"
+val invalidTopicName = "topic abc"  // Invalid topic name due to space
 val consumer = createConsumer()
 
 // subscribe invalid topic
 consumer.subscribe(List(invalidTopicName).asJava)
 
-consumer.poll(Duration.ZERO);
-val exception = assertThrows(classOf[InvalidTopicException], () => 
consumer.poll(Duration.ofMillis(5000)))
-assertEquals(String.format("Invalid topics: [%s]", invalidTopicName), 
exception.getMessage)
+var exception : Exception = null
+TestUtils.waitUntilTrue(() => {
+  try consumer.poll(Duration.ofMillis(500)) catch {
+case e : Exception => exception = e

Review Comment:
   also, we probably should fail the test if we get other exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631328022


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -192,15 +192,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
-val invalidTopicName = "topic abc"
+val invalidTopicName = "topic abc"  // Invalid topic name due to space

Review Comment:
   nit: let's just break this into two lines
   
   ```
   // Invalid topic name due to space
   val invalidTopicName = "topic abc"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631326145


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -192,15 +192,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
-val invalidTopicName = "topic abc"
+val invalidTopicName = "topic abc"  // Invalid topic name due to space
 val consumer = createConsumer()
 
 // subscribe invalid topic
 consumer.subscribe(List(invalidTopicName).asJava)
 
-consumer.poll(Duration.ZERO);
-val exception = assertThrows(classOf[InvalidTopicException], () => 
consumer.poll(Duration.ofMillis(5000)))
-assertEquals(String.format("Invalid topics: [%s]", invalidTopicName), 
exception.getMessage)
+var exception : Exception = null
+TestUtils.waitUntilTrue(() => {
+  try consumer.poll(Duration.ofMillis(500)) catch {
+case e : Exception => exception = e
+  }
+  exception != null
+}, waitTimeMs = 5000, msg = "An exception should be thrown.")

Review Comment:
   let's be explicit about InvalidTopicException in the message part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631322984


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -192,15 +192,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
-val invalidTopicName = "topic abc"
+val invalidTopicName = "topic abc"  // Invalid topic name due to space
 val consumer = createConsumer()
 
 // subscribe invalid topic
 consumer.subscribe(List(invalidTopicName).asJava)
 
-consumer.poll(Duration.ZERO);
-val exception = assertThrows(classOf[InvalidTopicException], () => 
consumer.poll(Duration.ofMillis(5000)))
-assertEquals(String.format("Invalid topics: [%s]", invalidTopicName), 
exception.getMessage)
+var exception : Exception = null
+TestUtils.waitUntilTrue(() => {
+  try consumer.poll(Duration.ofMillis(500)) catch {
+case e : Exception => exception = e

Review Comment:
   can we match for InvalidTopicException here? 
   ```
   case e: InvalidTopicException =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631262656


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic
+consumer.subscribe(List(invalidTopicName).asJava)
+
+consumer.poll(Duration.ZERO);

Review Comment:
   actually, looking at the code, it seems like 
`updateAssignmentMetadataIfNeeded` should continue to poll the background 
queue, i think i'm missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631268525


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic
+consumer.subscribe(List(invalidTopicName).asJava)
+
+consumer.poll(Duration.ZERO);

Review Comment:
   @philipnee Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631262656


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic
+consumer.subscribe(List(invalidTopicName).asJava)
+
+consumer.poll(Duration.ZERO);

Review Comment:
   actually, looking at the code, it seems like 
`updateAssignmentMetadataIfNeeded` should continue to poll the background 
queue, i think i'm missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631236747


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic
+consumer.subscribe(List(invalidTopicName).asJava)
+
+consumer.poll(Duration.ZERO);

Review Comment:
   if you look at the log, you can see the client poll the background event 
queue before receiving the metadata error as you poll immediately after 
subscribe, whereas the old consumer would keep polling the network client 
during the long poll (the second poll) and propagate the exception when 
receiving it.
   
   you can try to use `waitUntilTrue` to check if the exception is thrown in 
the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631236747


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic
+consumer.subscribe(List(invalidTopicName).asJava)
+
+consumer.poll(Duration.ZERO);

Review Comment:
   if you look at the log, you can see the client poll the background event 
queue before receiving the metadata error as you poll immediately after 
subscribe, whereas the old consumer would keep polling the network client 
during the long poll (the second poll).
   
   you can try to use `waitUntilTrue` to check if the exception is thrown in 
the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631102468


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +183,80 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+Metadata metadata = new Metadata(100, 100, 5,

Review Comment:
   @philipnee Thanks for review!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1631101468


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -189,6 +189,20 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertThrows(classOf[InvalidTopicException], () => 
consumer.partitionsFor(";3# ads,{234"))
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeInvalidTopic(quorum: String, groupProtocol: String): Unit = 
{
+val invalidTopicName = "topic abc"
+val consumer = createConsumer()
+
+// subscribe invalid topic
+consumer.subscribe(List(invalidTopicName).asJava)
+
+consumer.poll(Duration.ZERO);

Review Comment:
   Without this line, it will fail when groupProtocol is consumer.
   When debugging, after processing the Background Event, Consumer receives an 
`InvalidTopicException`.
   So adding `consumer.poll(Duration.ZERO)`
   
   Do you have any thoughts on this? @philipnee & @lianetm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630919078


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -145,6 +154,7 @@ void runOnce() {
 .map(networkClientDelegate::addAll)
 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
 networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+maybePropagateMetadataError();

Review Comment:
   I looked at `ConsumerNetworkThread.run()` because `Metadata.fatalException` 
is now propagated along with `InvalidTopicException` due to this change.
   When I looked at the code, there was the code where 
`networkClientDelegate.poll` was actually called separately in addition to 
`ConsumerNetworkThread.runOnce()`.
   ```
   // ConsumerNetworkThread.run()
   public void run() {
   try {
   ...
   while (running) {
   ...
   runOnce();
   ...
   }
   } finally {
   // call networkClientDelegate.poll()
   cleanup();
   }
   }
   ```
   Therefore, I thought that the Metadata error might be missed at least once 
when the `AsyncKafkaConsumer.close()` is called and there are a Metadata error.
   So I applied it to the `NetworkClientDelegate`.
   
   Let me know if I've misunderstood



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630912266


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -360,6 +370,52 @@ void testSendUnsentRequest() {
 assertFalse(networkClient.hasAnyPendingRequests());
 }
 
+@Test
+void testInvalidTopicMetadataErrorEvent() {
+String invalidTopicName = "topic abc";  // Invalid topic name due to 
space
+
+
when(testBuilder.subscriptions.matchesSubscribedPattern(invalidTopicName))
+.thenReturn(true);
+
+Cluster cluster = metadata.fetch();
+List topicMetadata = new ArrayList<>();
+topicMetadata.add(new 
MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+invalidTopicName, false, Collections.emptyList()));
+MetadataResponse updateResponse = 
RequestTestUtils.metadataResponse(cluster.nodes(),
+cluster.clusterResource().clusterId(),
+cluster.controller().id(),
+topicMetadata);
+
+client.prepareMetadataUpdate(updateResponse);
+metadata.requestUpdateForNewTopics();
+consumerNetworkThread.runOnce();
+
+BackgroundEvent event = backgroundEventsQueue.poll();
+assertNotNull(event);
+assertEquals(BackgroundEvent.Type.ERROR, event.type());
+assertEquals(InvalidTopicException.class, ((ErrorEvent) 
event).error().getClass());
+assertEquals(String.format("Invalid topics: [%s]", invalidTopicName),
+((ErrorEvent) event).error().getMessage());
+}
+
+@Test
+void testMetadataErrorEvent() {
+metadata.fatalError(new AuthenticationException("Authentication 
failed"));
+
+consumerNetworkThread.runOnce();
+BackgroundEvent event = backgroundEventsQueue.poll();
+assertNotNull(event);
+assertEquals(BackgroundEvent.Type.ERROR, event.type());
+assertEquals(AuthenticationException.class, ((ErrorEvent) 
event).error().getClass());
+assertEquals("Authentication failed", ((ErrorEvent) 
event).error().getMessage());
+}
+
+@Test
+void testNoMetadataErrorEvent() {

Review Comment:
   same, you don't need to test this in the consumer network thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630911599


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -360,6 +370,52 @@ void testSendUnsentRequest() {
 assertFalse(networkClient.hasAnyPendingRequests());
 }
 
+@Test
+void testInvalidTopicMetadataErrorEvent() {

Review Comment:
   it think it would be really great if you could create an integration test in 
`PlaintextConsumerTest` that mimics the topic metadata error.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630908222


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +183,80 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+Metadata metadata = new Metadata(100, 100, 5,
+new LogContext(), new ClusterResourceListeners());
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(metadata,
+new BackgroundEventHandler(backgroundEventQueue));
+
+String exMsg = "Test Auth Exception";
+metadata.fatalError(new AuthenticationException(exMsg));
+assertEquals(0, backgroundEventQueue.size());
+
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());
+}
+
+@Test
+public void testPropagateInvalidTopicMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+Metadata metadata = new Metadata(100, 100, 5,
+new LogContext(), new ClusterResourceListeners());
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(metadata,
+new BackgroundEventHandler(backgroundEventQueue));
+
+String invalidTopic = "invalid topic";
+MetadataResponse invalidTopicResponse = 
RequestTestUtils.metadataUpdateWith("clusterId", 1,
+Collections.singletonMap(invalidTopic, 
Errors.INVALID_TOPIC_EXCEPTION), Collections.emptyMap());
+metadata.updateWithCurrentRequestVersion(invalidTopicResponse, false, 
time.milliseconds());
+assertEquals(0, backgroundEventQueue.size());
+
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(InvalidTopicException.class, event.error().getClass());
+assertEquals(String.format("Invalid topics: [%s]", invalidTopic),
+event.error().getMessage());
+}
+
 public NetworkClientDelegate newNetworkClientDelegate() {
 LogContext logContext = new LogContext();
 Properties properties = new Properties();
 properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 properties.put(GROUP_ID_CONFIG, GROUP_ID);
 properties.put(REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS);
-return new NetworkClientDelegate(this.time, new 
ConsumerConfig(properties), logContext, this.client);
+return new NetworkClientDelegate(this.time,
+new ConsumerConfig(properties),
+logContext,
+this.client,
+this.metadata,
+this.backgroundEventHandler);
+}
+
+public NetworkClientDelegate newNetworkClientDelegate(Metadata metadata,

Review Comment:
   probably not needed. you can do
   ```
   this.metadata = mock(Metadata.class)
   this.backgroundEventHandler = ...
   
   ncd = newNetworkClientDelegate()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630904040


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +183,80 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+Metadata metadata = new Metadata(100, 100, 5,
+new LogContext(), new ClusterResourceListeners());
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(metadata,
+new BackgroundEventHandler(backgroundEventQueue));
+
+String exMsg = "Test Auth Exception";
+metadata.fatalError(new AuthenticationException(exMsg));
+assertEquals(0, backgroundEventQueue.size());
+
+networkClientDelegate.poll(0, time.milliseconds());
+assertEquals(1, backgroundEventQueue.size());
+
+ErrorEvent event = (ErrorEvent) backgroundEventQueue.poll();
+assertNotNull(event);
+assertEquals(AuthenticationException.class, event.error().getClass());
+assertEquals(exMsg, event.error().getMessage());
+}
+
+@Test
+public void testPropagateInvalidTopicMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+Metadata metadata = new Metadata(100, 100, 5,

Review Comment:
   similarly, same comment as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630902382


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -169,14 +183,80 @@ public void testHasAnyPendingRequests() throws Exception {
 }
 }
 
+@Test
+public void testPropagateMetadataError() {
+LinkedList backgroundEventQueue = new LinkedList<>();
+Metadata metadata = new Metadata(100, 100, 5,

Review Comment:
   you can do something like this instead of creating a metadata object (which 
is not needed)
   
   ```
   Metadata metadata = mock(Metadata.class);
   NetworkClientDelegate networkClientDelegate = new 
NetworkClientDelegate(metadata,
   new BackgroundEventHandler(backgroundEventQueue));
   
   String exMsg = "Test Auth Exception";
   AuthenticationException authException = new 
AuthenticationException(exMsg);
   doThrow(authException).when(metadata).maybeThrowAnyException();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-07 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630893130


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -360,6 +370,52 @@ void testSendUnsentRequest() {
 assertFalse(networkClient.hasAnyPendingRequests());
 }
 
+@Test
+void testInvalidTopicMetadataErrorEvent() {

Review Comment:
   i don't think we need this test in the consumer network thread as this is 
indirectly testing network client delegate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist closed pull request #15961: KAFKA-16764: New consumer should throw 
InvalidTopicException on poll when invalid topic in metadata.
URL: https://github.com/apache/kafka/pull/15961


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2153682806

   @philipnee right, I got it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-06 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630477551


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -145,6 +154,7 @@ void runOnce() {
 .map(networkClientDelegate::addAll)
 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
 networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+maybePropagateMetadataError();

Review Comment:
   @lianetm
   I got it.
   I will work on `NetworkClientDelegate.poll`
   
   > Any reason to have this here instead of inside the `NetworkClientDelegate` 
poll?
   
   With this work, I also wanted to address the following review points.
   https://github.com/apache/kafka/pull/16043#issuecomment-2130547360
   https://github.com/apache/kafka/pull/16043#issuecomment-2127634542



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


philipnee commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2153668330

   hey @appchemist - thanks for putting time into the pr.  i assume 
[#16043](https://github.com/apache/kafka/pull/16043) is the pr you want us to 
review, no? it would be easier to just have a single opened pr and iterate on 
that in the future. do you want to close this if this pr is irrelevant now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-06 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630477551


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -145,6 +154,7 @@ void runOnce() {
 .map(networkClientDelegate::addAll)
 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
 networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+maybePropagateMetadataError();

Review Comment:
   @lianetm Thanks for review!
   I got it.
   I will work on `NetworkClientDelegate.poll`
   
   > Any reason to have this here instead of inside the `NetworkClientDelegate` 
poll?
   
   With this work, I also wanted to address the following review points.
   https://github.com/apache/kafka/pull/16043#issuecomment-2130547360
   https://github.com/apache/kafka/pull/16043#issuecomment-2127634542



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-06 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630472373


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -145,6 +154,7 @@ void runOnce() {
 .map(networkClientDelegate::addAll)
 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
 networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+maybePropagateMetadataError();

Review Comment:
   hey @lianetm - it seems like invalidTopicException is only thrown in the 
`poll()`.  i guess `InvalidTopicException` is irrelevant during closing as the 
client is shutting down anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1630461949


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   @philipnee right, please take a 
look.(https://github.com/apache/kafka/pull/16043)
   Can i close this pr(https://github.com/apache/kafka/pull/15961)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-06 Thread via GitHub


lianetm commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2153385925

   Hey @appchemist, thanks for the changes! Seems like a simpler approach. Left 
one question about the specific place to have the logic. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1630221371


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -145,6 +154,7 @@ void runOnce() {
 .map(networkClientDelegate::addAll)
 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
 networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
+maybePropagateMetadataError();

Review Comment:
   Any reason to have this here instead of inside the `NetworkClientDelegate` 
poll? 
   
   I would expect it inside the NetworkClientDelegate poll, which would be the 
equivalent of the ConsumerNetworkClient (where the legacy consumer has this 
same logic). I know here we are calling them sequentially so we'll get the same 
result, but networkClientDelegate.poll is called from other places too (on 
close), so we wouldn't propagate the error on those cases I guess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


philipnee commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1629846782


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   thanks, is this the PR to review? https://github.com/apache/kafka/pull/16043
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1629752083


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   hey, @philipnee - No, If you agree, I will close this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist opened a new pull request, #15961:
URL: https://github.com/apache/kafka/pull/15961

   - Add ErrorPropagateMetadataUpdater
  - Just proxy but propagates error though BackgroundEventHandler
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


philipnee commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1629622755


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   hey @appchemist - seems like you closed this pr. do you mean to continue 
working on this PR?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-06 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2152367583

   @philipnee Thanks for respond
   
   1. No problem, I misunderstood.
   2. Implemented in 
`ConsumerNetworkThreadTest`(https://github.com/apache/kafka/pull/16043)
   
   In my thought, propagating metadata errors after `KafkaClient.poll()` should 
be in line with legacy consumers.
   And It seems like we can choose one of the items below.
   - Simple approach
  - https://github.com/apache/kafka/pull/15961#issuecomment-2135478481
  - https://github.com/apache/kafka/pull/15961#discussion_r1628247971
   - Wrapping metadata and a background queue in an object


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-06 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2151916716

   If you have a moment, Please take a look. @lianetm , @philipnee , @kirktrue 
   
   I've applied a simple approach to the ConsumerNetworkThread level.
   I pushed it a few minutes ago, so if you've seen the code before, you should 
see it anew.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-06-05 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2151383230

   Reopen this pr for KAFKA-16764


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


appchemist closed pull request #15961: KAFKA-16764: New consumer should throw 
InvalidTopicException on poll when invalid topic in metadata.
URL: https://github.com/apache/kafka/pull/15961


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1628553681


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   @philipnee How about these five lines be in `ConsumerNetworkThread`?
   I will add integration test in `ConsumerNetworkThreadTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1628553681


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   @philipnee How about these five lines be in `ConsumerNetworkThread`?
   I will add integration test 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2151071098

   I think that the approach from #16043 is more in line with the approach I 
imagined.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1628496087


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   I agree.
   If these five lines be in `ConsumerNetworkThread`, I think the drawbacks of 
the simple version mentioned by @philipnee can also be addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


philipnee commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2151036734

   @appchemist - was away for a few days so sorry for not responding.  there 
are two points to address before moving the pr forward
   1. we definitely don't need to implement the request manager, apologize for 
not clarifying the comment. i meant to suggest wrapping metadata and a 
background queue in an object so that we can propagate the error after polling 
for the network client. 
   2. we need an integration test for invalid topic exception.
   
   what's your opinion on the approach @lianetm  suggested?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2151024735

   @lianetm No problem, I enjoyed the discussion.
   @kirktrue Thanks for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


philipnee commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1628483904


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {

Review Comment:
   @appchemist - you might have misunderstood my initial comment.  there's no 
"request" per se so we don't need to wrap the manager in a RequestManager. So 
let's remove that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


lianetm commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2150713653

   Hey @kirktrue , before continuing reviewing this PR, we had some concerns 
about the approach, and were considering a simpler alternative, see [comment 
above](https://github.com/apache/kafka/pull/15961#issuecomment-2150487736). Let 
us know what you think. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


kirktrue commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1628244915


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {

Review Comment:
   We shouldn't need to make this a `RequestManager`. Can we just create an 
instance of the `MetadataErrorManager` from the `ConsumerNetworkThread`, and 
then invoke `maybePropagateMetadataError ` at the appropriate point inside 
`runOnce()`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+
+public class MetadataErrorManager implements RequestManager {
+private final Metadata metadata;
+private final BackgroundEventHandler backgroundEventHandler;
+
+public MetadataErrorManager(Metadata metadata, BackgroundEventHandler 
backgroundEventHandler) {
+assert metadata != null;
+assert backgroundEventHandler != null;
+
+this.metadata = metadata;
+this.backgroundEventHandler = backgroundEventHandler;
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+maybePropagateMetadataError();
+
+return PollResult.EMPTY;
+}
+
+private void maybePropagateMetadataError() {
+try {
+metadata.maybeThrowAnyException();
+} catch (Exception e) {
+backgroundEventHandler.add(new ErrorEvent(e));
+}

Review Comment:
   Ideally these five lines could just be in `ConsumerNetworkThread`, either 
inline in `runOnce()` or as a separate method.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataErrorManager.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+

Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


lianetm commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2150487736

   Oh I totally understand you now with that other PR and the feedback there. I 
see @philipnee initially suggested the managers approach but I believe we've 
come to the point after the discussion and changes on this PR where that 
approach looks like an overkill and it introduces a difference in behaviour 
with the legacy logic ([comments 
above](https://github.com/apache/kafka/pull/15961#issuecomment-2148155631)). If 
you agree @philipnee let's maybe close this PR and move back to the simpler 
approach of https://github.com/apache/kafka/pull/16043. 
   
   @appchemist I'll start reviewing https://github.com/apache/kafka/pull/16043 
right away just to gain time, we are running against the clock to make sure 
this makes it into the 3.8 cut next week. Sorry for the confusion, but seems 
like we're heading in the right direction now with the other PR and a better 
understanding of the drawbacks of the managers approach, good progress! Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2150328159

   Hey @lianetm
   
   Sorry about that. I've created a Simple version of the PR separately, and I 
think that's why it didn't share.
   
   Firstly, I understand the differences you mentioned.
   And here's what I got reviewed as the downside of the Simple version.
   https://github.com/apache/kafka/pull/16043#issuecomment-2127634542
   https://github.com/apache/kafka/pull/16043#issuecomment-2130547360
   
   If there are issues with the differences you described, I think I should go 
with the approach you mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


lianetm commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2149990136

   Hey @appchemist, agree that as long as we continue polling the 
ConsumerNetworkThread we should get the error thrown (on the next poll after it 
was detected), but the inverted order we get with the managers approach 
(maybeThrowAnyException and then poll to detect the error to throw) would 
matter in cases when there's no such next poll right? I'm thinking about close 
for example, that we do one last poll (may detect metadata errors) and then we 
don't poll again (so it won't be thrown). The legacy consumer wouldn't behave 
like that I expect. Makes sense?
   
   If we agree that the managers approach seems a bit complicated and has an 
inverted order that brings a difference, why do we still prefer it over the 
simpler approach that is closer to the legacy logic? Do you see a downside to 
the simpler approach? (I may be missing it). Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-05 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2149560613

   @lianetm Thanks for review!
   
   I agree that the Manager approach is a bit like an overkill.
   
   For issues that have different timing to legacy consumers, I think about it 
like this
   At the code level view
   For the Legacy consumer, the code order is to call 
`Metadata.maybeThrowAnyException()` roughly after `KafkaClient.poll()`.
   For the Manager approach, the code order is to call `KafkaClient.poll()` 
roughly after `Metadata.maybeThrowAnyException()`.
   So they look semantically different.
   
   However, when viewed at the `ConsumerNetworkThread` level, this is roughly 
how it works
   ```
   // in ConsumerNetworkThread.run()
   while (running) {
   // Omit some
   Metadata.maybeThrowAnyException();
   KafkaClient.poll();
   }
   ```
   If we only look at the calls to `KafkaClient.poll()` and 
`Metadata.maybeThrowAnyException()`, which is what we're interested in, it 
looks like this
   `Metadata.maybeThrowAnyException() -> KafkaClient.poll() -> 
Metadata.maybeThrowAnyException() -> ...`
   Therefore, I think there is no significant time difference physically, but I 
think it's bad for code readability
   
   Please let me know if I've misunderstood anything


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-04 Thread via GitHub


lianetm commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2148155631

   Hey all, I still have concerns with this approach of a manager just to 
propagate the metadata errors. Not only that it still feels a bit like an 
overkill, but I'm worried it brings a fundamental timing difference with the 
legacy consumer.
   
   For both consumers, metadata errors are identified (saved to be propagated) 
on the NetworkClient when a metadata response is received 
([here](https://github.com/apache/kafka/blob/55d38efcc5505a5a1bddb08ba05f4d923f8050f9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L942)).
 Then:
   - the legacy code throws metadata errors on the same poll iteration that 
discovered the error (detect error 
[here](https://github.com/apache/kafka/blob/55d38efcc5505a5a1bddb08ba05f4d923f8050f9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L276-L284),
 throw it right after in the same poll 
([here](https://github.com/apache/kafka/blob/55d38efcc5505a5a1bddb08ba05f4d923f8050f9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315)))
 
   - new code (with the current PR approach), would throw the metadata error on 
the next poll after the one that detected it (just because the 
ConsumerNetworkThread polls the managers before polling the network client, see 
[here](https://github.com/apache/kafka/blob/55d38efcc5505a5a1bddb08ba05f4d923f8050f9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L144-L147)).
 
   
   Is there a concern with the alternative simple approach [suggested 
above](https://github.com/apache/kafka/pull/15961#issuecomment-2135478481)? 
Feels simpler and keeps to the semantics of the legacy client, ensuring that 
the same network layer poll that detects a metadata error will propagate it. 
Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata. [kafka]

2024-06-04 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2147617237

   @philipnee if you have a moment, please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-30 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1620884853


##
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##
@@ -172,7 +173,37 @@ public static NetworkClient 
createNetworkClient(AbstractConfig config,
 null,
 new DefaultHostResolver(),
 throttleTimeSensor,
-clientTelemetrySender);
+clientTelemetrySender,
+Function.identity());

Review Comment:
   @philipnee I've applied it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-29 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1618318070


##
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##
@@ -172,7 +173,37 @@ public static NetworkClient 
createNetworkClient(AbstractConfig config,
 null,
 new DefaultHostResolver(),
 throttleTimeSensor,
-clientTelemetrySender);
+clientTelemetrySender,
+Function.identity());

Review Comment:
   Are you suggesting creating a RequestManager for Metadata Errors?
   
   In my understanding, the role of a RequestManager is to manage outgoing 
requests. 
   However, looking at the MembershipManagerImpl, It doesn't seem like its role 
is necessarily limited to requests.
   By adopting this approach, we could propagate Metadata Error while 
maintaining the structure of the new Consumer.
   So I think it's good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-29 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2136860702

   To summarize, the locations where Metadata Errors can be handled, it seems 
like three main options exist:
   
   - `ConsumerNetworkThread`
   - `NetworkClientDelegate`
   - `KafkaClient`
  - I think we all agree that the location is too deep and low-level for 
handling Metadata Errors.
   
   ### When handled by `NetworkClientDelegate`
   For legacy Consumers, `ConsumerNetworkClient.poll` is propagating the error.
   For new Consumers, `NetworkClientDelegate` appears to provide high-level 
access to `KafkaClient`, like `ConsumerNetworkClient`.
   From this perspective, handling Metadata Errors in 
`NetworkClientDelegate.poll` can be considered analogous to the approach in the 
legacy Consumer.
   Since both legacy and new Consumers handle error at the same level, this is 
considered an expected location.
   I think that's what the people who commented about handling it here were 
focusing on.
   What do you think? @philipnee 
   
   ### When handled by `ConsumerNetworkThread`
   Handling in `NetworkClientDelegate.poll` is called inside 
`ConsumerNetworkThread.runOnce`, which hides the intention of error propagation.
   Also, it adds the responsibility of propagating metadata errors to the 
existing responsibility of interfacing with the `NetworkClient`.
   But `ConsumerNetworkThread.runOnce` is explicitly defines and runs a 
background thread task.
   So There is an opinion that create Metadata Error Manager in 
`ConsumerNetworkThread` level.
   What do you think? @lianetm & @kirktrue 
   
   ### My Opinion
   My suggestion would be to add a `RequestManager` for Metadata Error in 
`ConsumerNetworkThread` level.
   This is because the `NetworkClientDelegate` focused solely on interfacing 
with the network client and handling errors within `ConsumerNetworkThread`.
   
   Please let me know if I've misunderstood anything


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-29 Thread via GitHub


appchemist commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1618318070


##
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##
@@ -172,7 +173,37 @@ public static NetworkClient 
createNetworkClient(AbstractConfig config,
 null,
 new DefaultHostResolver(),
 throttleTimeSensor,
-clientTelemetrySender);
+clientTelemetrySender,
+Function.identity());

Review Comment:
   Are you suggesting creating a RequestManager for Metadata Errors?
   
   In my understanding, the role of a RequestManager is to manage outgoing 
requests. 
   However, looking at the MembershipManagerImpl, it seems like a good idea. 
   By adopting this approach, we could propagate Metadata Error while 
maintaining the structure of the new Consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-28 Thread via GitHub


philipnee commented on code in PR #15961:
URL: https://github.com/apache/kafka/pull/15961#discussion_r1618032800


##
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java:
##
@@ -172,7 +173,37 @@ public static NetworkClient 
createNetworkClient(AbstractConfig config,
 null,
 new DefaultHostResolver(),
 throttleTimeSensor,
-clientTelemetrySender);
+clientTelemetrySender,
+Function.identity());

Review Comment:
   i think we are going pretty deep to access the metadata error, i wonder if 
we could simplify the pattern.
   
   we can probably directly access the metadata error in the background thread 
and send it to the client, in this case we don't need to create a separated 
wrapper around the updater.
   
   wdyt if we just create a metadata error manager in the consumer network 
thread level and poll the metadata error directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-28 Thread via GitHub


lianetm commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2135478481

   Hey @appchemist , thanks for the updates! High level comment with the goal 
of trying to simplify if possible. I don't quite get the need for a new way of 
updating metadata (`ErrorPropagateMetadataUpdater`) when it seems to me that 
the way metadata is updated is not different for the new consumer vs the legacy 
one. It's how the errors are propagated what's different here (and what we need 
to sort out). What about the option of just keeping the metadata update logic 
as it was, and only changing the propagation logic:
   1. call a new `maybeThrowMetadataErrors()` in the NetworkClientDelegate 
poll()
   2. define maybeThrowMetadataErrors simply as :
   
   ```
   private void maybeThrowMetadataErrors() {
   try {
   metadata.maybeThrowAnyException();
   } catch (Exception e) {
   backgroundEventHandler.add(new ErrorEvent(e));
   }
   }
   ```
   
   With this, we know that metadata updates will be applied in the same way for 
both consumer impl (ConsumerNetworkClient and NetworkClientDelegate), and the 
only difference is how the error is propagated (directly thrown in the former 
vs propagated via events in the latter). Does this make sense? 
   
   Regardless of the impl, I suggest we also add a test in the 
`NetworkClientDelegate`, mocking metadata errors, and seeing how polling the 
network client should generate the `ErrorEvent`. 
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-28 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2134918869

   kindly ping @philipnee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-24 Thread via GitHub


appchemist closed pull request #16043: KAFKA-16764: New consumer should throw 
InvalidTopicException on poll when invalid topic in metadata
URL: https://github.com/apache/kafka/pull/16043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-24 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2130614939

   Hey @philipnee
   I got it!
   
   I think you're confused because I created two PRs.
   In my think, this PR is far from our priorities, so I'll close it.
   I'm sorry, but could you please review 
https://github.com/apache/kafka/pull/15961(an other pr)?
   I've only written unit tests for it.
   I want to make sure it's where we think it should be and create integration 
tests as well.
   Also, I didn't find the "integration test module" you mentioned, could you 
give me the package name or class name?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-24 Thread via GitHub


philipnee commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2130547360

   Hey @appchemist -- I mistakenly thought this PR was #15961, sorry.  if you 
read ConsumerNetwokrThread#runOnce() you can see the original intention was to 
explicitly define and execute the background thread operations.  While putting 
metadata error propagation in the network delegate might reduce code, it also 
hides the intention (of checking metadata error).  This is my thought:
   
   ```
   void runOnce() {
   ...
   applicationEventProcessor.process();
   
   final long currentTimeMs = time.milliseconds();
   final long pollWaitTimeMs = ...
   networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
   metadataErrorHandler.maybeEmitError();
   cachedMaximumTimeToWait = ...
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2128669455

   > i really want to keep the network client delegate to just interfacing with 
the network client, so i wonder if it would be a better design to handle the 
metadata error in a separated module. wdyt?
   
   I think there are pros and cons to each approach.
   
   Current Patch:
   Pros:
   - Simple
  - Enhanced code comprehension: Similar to legacy code
   
   Cons:
   - Violates SRP
  - `NetworkClientDelegate` will also have non-interfacing behaviour.
   
   Separated Modules(`PropagateErrorMetadataUpdater` in 
https://github.com/apache/kafka/pull/15961):
   Pros:
   - Adheres to SOLID principles
   
   Cons:
   - Can be more complex
   - Act on the client network layer.
   
   I'm having difficulty determining which aspect should be prioritized.
   Like you said, it made me think about design as a priority.
   
   Our goal is to propagate metadata errors from the background thread to the 
foreground thread at the appropriate time.
   `MetadataUpdater` seems to play a "role" in handling metadata changes, 
making it a suitable candidate for propagating metadata errors.
   `PropagateErrorMetadataUpdater` acts as a proxy for handling metadata errors.
   We've taken an approach that extends metadata error handling without 
modifying `NetworkClient`'s logic.
   
   What do you think?
   
   You can see the approach in https://github.com/apache/kafka/pull/15961 and I 
would appreciate your review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2128663846

   Hi @philipnee, Thank you for review!
   
   > do you think we can put the metadata error in the ConsumerNetworkThread 
and have an exception handler to relay the error back to the user via the 
background queue?
   
   Yes I know. However, I would appreciate your advice


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


philipnee commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2127634542

   hey @appchemist - thanks for the patch.  do you think we can put the 
metadata error in the ConsumerNetworkThread and have an exception handler to 
relay the error back to the user via the background queue? i really want to 
keep the network client delegate to just interfacing with the network client, 
so i wonder if it would be a better design to handle the metadata error in a 
separated module.  wdyt?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


philipnee commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1612009583


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2952,8 +2952,8 @@ private static class FetchInfo {
 // TODO: this test requires rebalance logic which is not yet implemented 
in the CONSUMER group protocol.
 //   Once it is implemented, this should use both group protocols.
 @ParameterizedTest
-@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
-public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) {
+@EnumSource(value = GroupProtocol.class)
+public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) 
throws InterruptedException {

Review Comment:
   i wonder if we should just move these tests to integration tests. In this 
case, KafkaConsumer test becomes a huge integration test.  
   
   ideally - we want to make sure the tests are deterministic and thread.sleep 
can potentially introduce flakiness, so i would conduct the test differently, 
i.e.:
   
   1. mock the background queue to ensure the right exception is being thrown 
during the poll
   2. add an integration test to test the exception (see the integration test 
module)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2127053686

   @philipnee & @kirktrue If you have a moment, Please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


appchemist commented on code in PR #16043:
URL: https://github.com/apache/kafka/pull/16043#discussion_r1611653408


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2974,6 +2974,7 @@ public void testSubscriptionOnInvalidTopic(GroupProtocol 
groupProtocol) {
 KafkaConsumer consumer = newConsumer(groupProtocol, 
time, client, subscription, metadata, assignor, true, groupInstanceId);
 consumer.subscribe(singleton(invalidTopicName), 
getConsumerRebalanceListener(consumer));
 
+Thread.sleep(100);

Review Comment:
   To ensure that `ConsumerNetworkThread.runOnce` executes at least once before 
the test ends, a `sleep` was added.
   Review please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


appchemist opened a new pull request, #16043:
URL: https://github.com/apache/kafka/pull/16043

   Propagate metadata error from background thread to foreground thread through 
backgroundEventQueue.
   
   Due to a different approach from https://github.com/apache/kafka/pull/15961, 
a new PR has been created.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-22 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2125950049

   @lianetm I got it, Thank you!
   @kirktrue Right! I think what you said is simple and more intuitive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-22 Thread via GitHub


lianetm commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2125262436

   Hey @appchemist, thanks for the updates! sorry I'm caught up in other tasks 
but we discussed offline and @philipnee will follow-up here to help with 
reviews on this one. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org