Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-25 Thread via GitHub


lucasbru merged PR #15535:
URL: https://github.com/apache/kafka/pull/15535


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-25 Thread via GitHub


lucasbru commented on PR #15535:
URL: https://github.com/apache/kafka/pull/15535#issuecomment-2018145845

   Two related flaky tests, 
`kafka.api.PlaintextConsumerTest.testExpandingTopicSubscriptions(String, 
String)[4]` and 
`org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest.testDescribeExistingGroupWithNoMembers(String,
 String)[4]`. But doesn't seem to be caused by this PR, as they have flaked on 
trunk before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-25 Thread via GitHub


lianetm commented on PR #15535:
URL: https://github.com/apache/kafka/pull/15535#issuecomment-2018010890

   Thanks both for the comments. I do like @AndrewJSchofield 's idea of having 
it all categorized but I ended up with several unrelated tests that seemed hard 
to group in a sensible way. Also the fact that @lucasbru pointed out, that I 
wanted to keep the way of running the abstract `BaseConsumerTest` made me keep 
the `PlaintextConsumerTest` as a kind of generic one. Makes sense? 
   
   @lucasbru just to confirm, I did an exact copy of the contents, only changes 
were the additions mentioned in the comments. 
   
   For the record, I think there's more room for improvement with a similar 
approach in other tests, I will follow up on that, probably following with the 
`PlaintextAdminIntegrationTest` which is in a similar (even worse) situation 
regarding run times.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-25 Thread via GitHub


lucasbru commented on PR #15535:
URL: https://github.com/apache/kafka/pull/15535#issuecomment-2017575807

   @AndrewJSchofield I think Lianet did not remove the `PlaintextConsumerTest` 
to makes sure we still run the tests in `BaseConsumerTest` in the plaintext 
case once. I guess we could make one of the categorized test suites inherit 
from `BaseConsumerTest`, but I think I'd prefer more just keeping 
`PlaintextConsumerTest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-25 Thread via GitHub


lucasbru commented on code in PR #15535:
URL: https://github.com/apache/kafka/pull/15535#discussion_r1537284419


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala:
##
@@ -0,0 +1,246 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import kafka.utils.{TestInfoUtils, TestUtils}
+import java.util.Properties
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import org.apache.kafka.common.PartitionInfo
+import java.util.stream.Stream
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable
+import org.junit.jupiter.params.provider.CsvSource
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@Timeout(600)
+class PlaintextConsumerAssignTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAssignAndCommitAsyncNotCommitted(quorum: String, groupProtocol: 
String): Unit = {
+val props = new Properties()
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+val cb = new CountConsumerCommitCallback
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitAsync(cb)
+TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || 
cb.lastError.isDefined,
+  "Failed to observe commit callback before timeout", waitTimeMs = 1)
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAssignAndCommitSyncNotCommitted(quorum: String, groupProtocol: 
String): Unit = {
+val props = new Properties()
+val consumer = createConsumer(configOverrides = props)
+val producer = createProducer()
+val numRecords = 1
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+consumer.assign(List(tp).asJava)
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+// No valid fetch position due to the absence of consumer.poll; and 
therefore no offset was committed to
+// tp. The committed offset should be null. This is intentional.
+assertNull(committedOffset.get(tp))
+assertTrue(consumer.assignment.contains(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAssignAndCommitSyncAllConsumed(quorum: String, groupProtocol: 
String): Unit = {
+val numRecords = 1
+
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+sendRecords(producer, numRecords, tp, startingTimestamp = 
startingTimestamp)
+
+val props = new Properties()
+val consumer = createConsumer(configOverrides = props)
+consumer.assign(List(tp).asJava)
+consumer.seek(tp, 0)
+consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 
0, startingTimestamp = startingTimestamp)
+
+consumer.commitSync()
+val committedOffset = consumer.committed(Set(tp).asJava)
+assertNotNull(committedOffset)
+assertNotNull(committedOffset.get(tp))
+assertEquals(numRecords, 

Re: [PR] KAFKA-16406: Splitting consumer integration test [kafka]

2024-03-22 Thread via GitHub


lianetm commented on PR #15535:
URL: https://github.com/apache/kafka/pull/15535#issuecomment-2015603774

   Hey @lucasbru , could you take a look if you have a chance? No changes in 
logic other than the addition I pointed out (see comment above). With this 
split I've been getting the run times for the main PlainText under 1h (used to 
between 3h-4h), and we're adding several new integration tests, all under 
30mins. We'll get more data as we continue to run builds with the change but 
should be a good step moving forward and looking for parallelization. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org