Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-28 Thread via GitHub


lucasbru merged PR #14758:
URL: https://github.com/apache/kafka/pull/14758


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-28 Thread via GitHub


lucasbru commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1830668394

   I created 3 new flaky test tickets, but all failed tests were flaky before 
on master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-28 Thread via GitHub


lucasbru commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1830071627

   CI is blocked by a hanging test bug that is going to celebrate its fourth 
birthday soon: https://issues.apache.org/jira/browse/KAFKA-9470. I restarted CI 
and submitted https://github.com/apache/kafka/pull/14855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-27 Thread via GitHub


lucasbru commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1405954797


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -36,14 +36,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testSimpleConsumption(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = {
 val numRecords = 1
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   Can we remove this line now?



##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -69,6 +70,7 @@ abstract class BaseConsumerTest extends AbstractConsumerTest {
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
 val consumerProps = new Properties()
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   Can we remove this line now?



##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -78,10 +80,12 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
 assertNotEquals(0, BaseConsumerTest.updateConsumerCount.get())
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testCoordinatorFailover(quorum: String): Unit = {
+  // ConsumerRebalanceListener temporarily not supported for consumer group 
protocol
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
+  def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = {
 val listener = new TestConsumerReassignmentListener()
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   can we remove this line now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404635960


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -36,14 +36,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testSimpleConsumption(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = {
 val numRecords = 1
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404560191


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -36,14 +36,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testSimpleConsumption(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = {
 val numRecords = 1
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   I did consider that. I have already endured the annoyance. I'll take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


dajac commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404537279


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -36,14 +36,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
-  def testSimpleConsumption(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = {
 val numRecords = 1
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)

Review Comment:
   Adding this change to all the tests is a bit annoying. Have you considered 
adding this to `IntegrationTestHarness.doSetup` or in `createConsumer`? We 
could infer it like we did with `isNewGroupCoordinatorEnabled()` in the same 
class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404233938


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
 }
 
 object BaseConsumerTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"),
+Arguments.of("kraft+kip848", "consumer"))
+  }
+
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersZkOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"))
+  }
+
+  // For tests that only work with the generic group protocol, we want to test 
the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"))

Review Comment:
   OK. Nice and easy to change now I've refactored it. I'll get on it :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


dajac commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404226250


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
 }
 
 object BaseConsumerTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"),
+Arguments.of("kraft+kip848", "consumer"))
+  }
+
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersZkOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"))
+  }
+
+  // For tests that only work with the generic group protocol, we want to test 
the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"))

Review Comment:
   We likely need it here too.



##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends 
AbstractConsumerTest {
 }
 
 object BaseConsumerTest {
+  // We want to test the following combinations:
+  // * ZooKeeper and the generic group protocol
+  // * KRaft and the generic group protocol
+  // * KRaft with the new group coordinator enabled and the consumer group 
protocol
+  def getTestQuorumAndGroupProtocolParametersAll() : 
java.util.stream.Stream[Arguments] = {
+java.util.stream.Stream.of(
+Arguments.of("zk", "generic"),
+Arguments.of("kraft", "generic"),
+Arguments.of("kraft+kip848", "consumer"))

Review Comment:
   We also need to test the `generic` with `kraft+kip848`. This is what all the 
tests with `kraft+kip848` prior to your change did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1825499807

   This PR now reflects the changes in KAFKA-14781 and also tests the new 
consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404213789


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitAsync(groupProtocol: String): Unit = {
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitAsync(cb)
+TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+  "Failed to observe commit callback before timeout", waitTimeMs = 1)
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSync(groupProtocol: String): Unit = {
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = {
+val numRecords = 1
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props)
+consumer.assign(List(tp).asJava)
+consumer.seek(tp, 0)
+consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+assertNotNull(committedOffset.get(tp))
+assertEquals(numRecords, committedOffset.get(tp).offset())
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsume(groupProtocol: String): Unit = {
+val numRecords = 10
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props,
+  configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+consumer.assign(List(tp).asJava)
+consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+assertEquals(numRecords, consumer.position(tp))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = {
+val numRecords = 10
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+val props = new Properties()
+props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol)
+val consumer = createConsumer(configOverrides = props,
+  configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+consumer.assign(List(tp).asJava)
+val offset = 1
+consumer.seek(tp, offset)
+consumeAndVerifyRecords(consumer = consumer, numRecords - offset, 
startingOffset = off

Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404208505


##
core/src/test/scala/kafka/utils/TestInfoUtils.scala:
##
@@ -39,6 +39,12 @@ object TestInfoUtils {
   } else {
 throw new RuntimeException(s"Unknown quorum value")
   }
+} else if (testInfo.getDisplayName().contains("groupProtocol=")) {

Review Comment:
   I've replaced this in the new commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1404207856


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -34,13 +35,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @Test
-  def testSimpleConsumption(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))

Review Comment:
   This has been replaced with a `MethodSource` that is capable of returning 
whatever combination we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-23 Thread via GitHub


dajac commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1824112828

   I just merged https://github.com/apache/kafka/pull/14781. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-23 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1403093845


##
core/src/test/scala/kafka/utils/TestInfoUtils.scala:
##
@@ -39,6 +39,12 @@ object TestInfoUtils {
   } else {
 throw new RuntimeException(s"Unknown quorum value")
   }
+} else if (testInfo.getDisplayName().contains("groupProtocol=")) {

Review Comment:
   It's because the parameterized tests in `PlaintextConsumerTest` choose 
between "generic" and "consumer". The new consumer ("consumer") is only 
supported for KRaft clusters, so it's necessary to make the test use KRaft in 
this instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-23 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1403092072


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
-  @Test
-  def testAutoCommitOnCloseAfterWakeup(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // close() is not committing 
offsets in consumer group protocol

Review Comment:
   I agree with the idea, but I will use a less wordy formulation such as 
"temporarily". I expect a bunch of these will be resolved in the next two weeks 
and we can run most of the tests across both consumers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-23 Thread via GitHub


AndrewJSchofield commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1824026135

   @dajac I suggest you simply merge #14871 and let me deal with the fall-out. 
This one is not going to be far behind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-22 Thread via GitHub


dajac commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1822554162

   @AndrewJSchofield We are about to merge 
https://github.com/apache/kafka/pull/14781. Is it possible to build this one on 
top of it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-22 Thread via GitHub


cadonna commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1401760510


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
-  @Test
-  def testAutoCommitOnCloseAfterWakeup(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // close() is not committing 
offsets in consumer group protocol

Review Comment:
   This is just a transient state, isn't it? At least 
https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close 
is planned. If KAFKA-15327 is still valid, can we formulate this comment 
accordingly like `close() is not committing offsets in consumer group protocol 
for now but it should when implementation is complete`.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   I guess the `ConsumerRebalanceListener` is also something that will be 
supported in the future by the consumer group protocol. 



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -537,33 +574,41 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 awaitAssignment(consumer, shrunkenAssignment)
   }
 
-  @Test
-  def testPartitionsFor(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol
+  def testPartitionsFor(groupProtocol: String): Unit = {
 val numParts = 2
 createTopic("part-test", numParts, 1)
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 val parts = consumer.partitionsFor("part-test")
 assertNotNull(parts)
 assertEquals(2, parts.size)
   }
 
-  @Test
-  def testPartitionsForAutoCreate(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol
+  def testPartitionsForAutoCreate(groupProtocol: String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 // First call would create the topic
 consumer.partitionsFor("non-exist-topic")
 val partitions = consumer.partitionsFor("non-exist-topic")
 assertFalse(partitions.isEmpty)
   }
 
-  @Test
-  def testPartitionsForInvalidTopic(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol

Review Comment:
   See my comment above about the functionality being supported in future.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): 
Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 runMultiConsumerSessionTimeoutTest(false)
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnClose(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnClose(groupProtocol: String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 runMultiConsumerSessionTimeoutTest(true)
   }
 
-  @Test
-  def testInterceptors(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Consumer interceptors not 
implemented for consumer group protocol

Review Comment:
   Will also interceptors be supported?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): 
Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 runMultiConsumerS

Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-21 Thread via GitHub


cadonna commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1821395496

   Done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-21 Thread via GitHub


kirktrue commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1821388129

   @cadonna / @lucasbru—can we add the `ctr` label to this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-21 Thread via GitHub


AndrewJSchofield commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1820506198

   Converting back to draft. This PR depends on 
https://github.com/apache/kafka/pull/14801 being merged. This is ready for 
review, but will not build cleanly until the other PR lands.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1811599069

   @philipnee can you tag with `ctr` and `KIP-848` 🥺 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0

Review Comment:
   I missed one :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0

Review Comment:
   I missed one :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368202


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0
 val tp = new TopicPartition(topic, partition)
 createTopic(topic, 1, 1)
 
-TestUtils.waitUntilTrue(() => {
-  this.zkClient.topicExists(topic)
-}, "Failed to create topic")
-

Review Comment:
   I'm not convinced we ever did. Again, this is ZK-specific. The tests work on 
both variants without this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393367240


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -34,13 +35,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @Test
-  def testSimpleConsumption(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))

Review Comment:
   Perhaps, but I want to be able to have a different array for each test to 
enable them to be turned on individually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


AndrewJSchofield commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393365674


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(numMessages - records.count, 
lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be 
${numMessages - records.count}")
   }
 
-  @Test
-  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): 
Unit = {
 val numRecords = 1000
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 consumer.assign(List(tp).asJava)
 consumer.seek(tp, 0)
 consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
 
-def assertNoMetric(broker: KafkaServer, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
-val metricName = broker.metrics.metricName("throttle-time",
-  quotaType.toString,
-  "",
-  "user", "",
-  "client-id", clientId)
-assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
+def assertNoMetric(broker: KafkaBroker, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
+  val metricName = broker.metrics.metricName("throttle-time",
+quotaType.toString,
+"",
+"user", "",
+"client-id", clientId)
+  assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
 }
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
 
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))

Review Comment:
   In this context, `servers` and `brokers` are interchangeable. This change 
makes the test work for ZK or KRaft. Previously, it was ZK-only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-14 Thread via GitHub


kirktrue commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1393305110


##
core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala:
##
@@ -34,13 +35,15 @@ import scala.collection.Seq
  */
 abstract class BaseConsumerTest extends AbstractConsumerTest {
 
-  @Test
-  def testSimpleConsumption(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))

Review Comment:
   Is it possible to have a different type of 'source' that can be defined once 
vs. on each test? More of a nit, but curious.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 s"The current assignment is ${consumer.assignment()}")
   }
 
-  @Test
-  def testConsumingWithNullGroupId(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Null group ID only supported for 
generic group protocol
+  def testConsumingWithNullGroupId(groupProtocol: String): Unit = {
 val topic = "test_topic"
-val partition = 0;
+val partition = 0
 val tp = new TopicPartition(topic, partition)
 createTopic(topic, 1, 1)
 
-TestUtils.waitUntilTrue(() => {
-  this.zkClient.topicExists(topic)
-}, "Failed to create topic")
-

Review Comment:
   Why don't we need this now?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(numMessages - records.count, 
lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be 
${numMessages - records.count}")
   }
 
-  @Test
-  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic", "consumer"))
+  def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): 
Unit = {
 val numRecords = 1000
 val producer = createProducer()
 val startingTimestamp = System.currentTimeMillis()
 sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
 
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 consumer.assign(List(tp).asJava)
 consumer.seek(tp, 0)
 consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0, startingTimestamp = startingTimestamp)
 
-def assertNoMetric(broker: KafkaServer, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
-val metricName = broker.metrics.metricName("throttle-time",
-  quotaType.toString,
-  "",
-  "user", "",
-  "client-id", clientId)
-assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
+def assertNoMetric(broker: KafkaBroker, name: String, quotaType: 
QuotaType, clientId: String): Unit = {
+  val metricName = broker.metrics.metricName("throttle-time",
+quotaType.toString,
+"",
+"user", "",
+"client-id", clientId)
+  assertNull(broker.metrics.metric(metricName), "Metric should not have 
been created " + metricName)
 }
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
-servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, 
consumerClientId))
 
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
-servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
-servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
producerClientId))
+brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, 
consumerClientId))
+brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, 
consumerClientId))

Review Comment:
   So this is incorrect in `trunk`, right?



##
core/src/test/scala/integration/kafka/