Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
lucasbru merged PR #14758: URL: https://github.com/apache/kafka/pull/14758 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
lucasbru commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1830668394 I created 3 new flaky test tickets, but all failed tests were flaky before on master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
lucasbru commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1830071627 CI is blocked by a hanging test bug that is going to celebrate its fourth birthday soon: https://issues.apache.org/jira/browse/KAFKA-9470. I restarted CI and submitted https://github.com/apache/kafka/pull/14855 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
lucasbru commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1405954797 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -36,14 +36,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testSimpleConsumption(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = { val numRecords = 1 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: Can we remove this line now? ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -69,6 +70,7 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) val consumerProps = new Properties() +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: Can we remove this line now? ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -78,10 +80,12 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { assertNotEquals(0, BaseConsumerTest.updateConsumerCount.get()) } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testCoordinatorFailover(quorum: String): Unit = { + // ConsumerRebalanceListener temporarily not supported for consumer group protocol + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) + def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = { val listener = new TestConsumerReassignmentListener() +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: can we remove this line now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404635960 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -36,14 +36,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testSimpleConsumption(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = { val numRecords = 1 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404560191 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -36,14 +36,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testSimpleConsumption(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = { val numRecords = 1 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: I did consider that. I have already endured the annoyance. I'll take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
dajac commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404537279 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -36,14 +36,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testSimpleConsumption(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = { val numRecords = 1 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: Adding this change to all the tests is a bit annoying. Have you considered adding this to `IntegrationTestHarness.doSetup` or in `createConsumer`? We could infer it like we did with `isNewGroupCoordinatorEnabled()` in the same class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404233938 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { } object BaseConsumerTest { + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + // * KRaft with the new group coordinator enabled and the consumer group protocol + def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic"), +Arguments.of("kraft+kip848", "consumer")) + } + + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic")) + } + + // For tests that only work with the generic group protocol, we want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic")) Review Comment: OK. Nice and easy to change now I've refactored it. I'll get on it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
dajac commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404226250 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { } object BaseConsumerTest { + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + // * KRaft with the new group coordinator enabled and the consumer group protocol + def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic"), +Arguments.of("kraft+kip848", "consumer")) + } + + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic")) + } + + // For tests that only work with the generic group protocol, we want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic")) Review Comment: We likely need it here too. ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { } object BaseConsumerTest { + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + // * KRaft with the new group coordinator enabled and the consumer group protocol + def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic"), +Arguments.of("kraft+kip848", "consumer")) Review Comment: We also need to test the `generic` with `kraft+kip848`. This is what all the tests with `kraft+kip848` prior to your change did. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1825499807 This PR now reflects the changes in KAFKA-14781 and also tests the new consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404213789 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitAsync(groupProtocol: String): Unit = { +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props) +val producer = createProducer() +val numRecords = 1 +val startingTimestamp = System.currentTimeMillis() +val cb = new CountConsumerCommitCallback +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +consumer.assign(List(tp).asJava) +consumer.commitAsync(cb) +TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, + "Failed to observe commit callback before timeout", waitTimeMs = 1) +val committedOffset = consumer.committed(Set(tp).asJava) +assertNotNull(committedOffset) +// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to +// tp. The committed offset should be null. This is intentional. +assertNull(committedOffset.get(tp)) +assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSync(groupProtocol: String): Unit = { +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props) +val producer = createProducer() +val numRecords = 1 +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +consumer.assign(List(tp).asJava) +consumer.commitSync() +val committedOffset = consumer.committed(Set(tp).asJava) +assertNotNull(committedOffset) +// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to +// tp. The committed offset should be null. This is intentional. +assertNull(committedOffset.get(tp)) +assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = { +val numRecords = 1 + +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props) +consumer.assign(List(tp).asJava) +consumer.seek(tp, 0) +consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + +consumer.commitSync() +val committedOffset = consumer.committed(Set(tp).asJava) +assertNotNull(committedOffset) +assertNotNull(committedOffset.get(tp)) +assertEquals(numRecords, committedOffset.get(tp).offset()) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsume(groupProtocol: String): Unit = { +val numRecords = 10 + +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) +consumer.assign(List(tp).asJava) +consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + +assertEquals(numRecords, consumer.position(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = { +val numRecords = 10 + +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) +consumer.assign(List(tp).asJava) +val offset = 1 +consumer.seek(tp, offset) +consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset = off
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404208505 ## core/src/test/scala/kafka/utils/TestInfoUtils.scala: ## @@ -39,6 +39,12 @@ object TestInfoUtils { } else { throw new RuntimeException(s"Unknown quorum value") } +} else if (testInfo.getDisplayName().contains("groupProtocol=")) { Review Comment: I've replaced this in the new commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404207856 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -34,13 +35,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @Test - def testSimpleConsumption(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) Review Comment: This has been replaced with a `MethodSource` that is capable of returning whatever combination we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
dajac commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1824112828 I just merged https://github.com/apache/kafka/pull/14781. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1403093845 ## core/src/test/scala/kafka/utils/TestInfoUtils.scala: ## @@ -39,6 +39,12 @@ object TestInfoUtils { } else { throw new RuntimeException(s"Unknown quorum value") } +} else if (testInfo.getDisplayName().contains("groupProtocol=")) { Review Comment: It's because the parameterized tests in `PlaintextConsumerTest` choose between "generic" and "consumer". The new consumer ("consumer") is only supported for KRaft clusters, so it's necessary to make the test use KRaft in this instance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1403092072 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } - @Test - def testAutoCommitOnCloseAfterWakeup(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // close() is not committing offsets in consumer group protocol Review Comment: I agree with the idea, but I will use a less wordy formulation such as "temporarily". I expect a bunch of these will be resolved in the next two weeks and we can run most of the tests across both consumers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1824026135 @dajac I suggest you simply merge #14871 and let me deal with the fall-out. This one is not going to be far behind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
dajac commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1822554162 @AndrewJSchofield We are about to merge https://github.com/apache/kafka/pull/14781. Is it possible to build this one on top of it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
cadonna commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1401760510 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } - @Test - def testAutoCommitOnCloseAfterWakeup(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // close() is not committing offsets in consumer group protocol Review Comment: This is just a transient state, isn't it? At least https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close is planned. If KAFKA-15327 is still valid, can we formulate this comment accordingly like `close() is not committing offsets in consumer group protocol for now but it should when implementation is complete`. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol Review Comment: I guess the `ConsumerRebalanceListener` is also something that will be supported in the future by the consumer group protocol. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -537,33 +574,41 @@ class PlaintextConsumerTest extends BaseConsumerTest { awaitAssignment(consumer, shrunkenAssignment) } - @Test - def testPartitionsFor(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol + def testPartitionsFor(groupProtocol: String): Unit = { val numParts = 2 createTopic("part-test", numParts, 1) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() val parts = consumer.partitionsFor("part-test") assertNotNull(parts) assertEquals(2, parts.size) } - @Test - def testPartitionsForAutoCreate(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol + def testPartitionsForAutoCreate(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() // First call would create the topic consumer.partitionsFor("non-exist-topic") val partitions = consumer.partitionsFor("non-exist-topic") assertFalse(partitions.isEmpty) } - @Test - def testPartitionsForInvalidTopic(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // partitionsFor not supported for consumer group protocol Review Comment: See my comment above about the functionality being supported in future. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerSessionTimeoutTest(false) } - @Test - def testMultiConsumerSessionTimeoutOnClose(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnClose(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerSessionTimeoutTest(true) } - @Test - def testInterceptors(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Consumer interceptors not implemented for consumer group protocol Review Comment: Will also interceptors be supported? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } - @Test - def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not supported for consumer group protocol + def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) runMultiConsumerS
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
cadonna commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1821395496 Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
kirktrue commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1821388129 @cadonna / @lucasbru—can we add the `ctr` label to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1820506198 Converting back to draft. This PR depends on https://github.com/apache/kafka/pull/14801 being merged. This is ready for review, but will not build cleanly until the other PR lands. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
kirktrue commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1811599069 @philipnee can you tag with `ctr` and `KIP-848` 🥺 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 Review Comment: I missed one :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 Review Comment: I missed one :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368202 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 val tp = new TopicPartition(topic, partition) createTopic(topic, 1, 1) -TestUtils.waitUntilTrue(() => { - this.zkClient.topicExists(topic) -}, "Failed to create topic") - Review Comment: I'm not convinced we ever did. Again, this is ZK-specific. The tests work on both variants without this check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393367240 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -34,13 +35,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @Test - def testSimpleConsumption(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) Review Comment: Perhaps, but I want to be able to have a different array for each test to enable them to be turned on individually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393365674 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(numMessages - records.count, lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be ${numMessages - records.count}") } - @Test - def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): Unit = { val numRecords = 1000 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() consumer.assign(List(tp).asJava) consumer.seek(tp, 0) consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) -def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String): Unit = { -val metricName = broker.metrics.metricName("throttle-time", - quotaType.toString, - "", - "user", "", - "client-id", clientId) -assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) +def assertNoMetric(broker: KafkaBroker, name: String, quotaType: QuotaType, clientId: String): Unit = { + val metricName = broker.metrics.metricName("throttle-time", +quotaType.toString, +"", +"user", "", +"client-id", clientId) + assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) } -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) Review Comment: In this context, `servers` and `brokers` are interchangeable. This change makes the test work for ZK or KRaft. Previously, it was ZK-only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
kirktrue commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393305110 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -34,13 +35,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @Test - def testSimpleConsumption(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) Review Comment: Is it possible to have a different type of 'source' that can be defined once vs. on each test? More of a nit, but curious. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 val tp = new TopicPartition(topic, partition) createTopic(topic, 1, 1) -TestUtils.waitUntilTrue(() => { - this.zkClient.topicExists(topic) -}, "Failed to create topic") - Review Comment: Why don't we need this now? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(numMessages - records.count, lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be ${numMessages - records.count}") } - @Test - def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): Unit = { val numRecords = 1000 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() consumer.assign(List(tp).asJava) consumer.seek(tp, 0) consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) -def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String): Unit = { -val metricName = broker.metrics.metricName("throttle-time", - quotaType.toString, - "", - "user", "", - "client-id", clientId) -assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) +def assertNoMetric(broker: KafkaBroker, name: String, quotaType: QuotaType, clientId: String): Unit = { + val metricName = broker.metrics.metricName("throttle-time", +quotaType.toString, +"", +"user", "", +"client-id", clientId) + assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) } -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) Review Comment: So this is incorrect in `trunk`, right? ## core/src/test/scala/integration/kafka/