TaiJuWu commented on code in PR #19822: URL: https://github.com/apache/kafka/pull/19822#discussion_r2157889011
########## core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala: ########## @@ -98,230 +90,6 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { } } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumptionWithBrokerFailures(groupProtocol: String): Unit = consumeWithBrokerFailures(10) - - /* - * 1. Produce a bunch of messages - * 2. Then consume the messages while killing and restarting brokers at random - */ - def consumeWithBrokerFailures(numIters: Int): Unit = { - val numRecords = 1000 - val producer = createProducer() - producerSend(producer, numRecords) - - var consumed = 0L - val consumer = createConsumer() - - consumer.subscribe(util.List.of(topic)) - - val scheduler = new BounceBrokerScheduler(numIters) - try { - scheduler.start() - - while (scheduler.isRunning) { - val records = consumer.poll(Duration.ofMillis(100)).asScala - - for (record <- records) { - assertEquals(consumed, record.offset()) - consumed += 1 - } - - if (records.nonEmpty) { - consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(util.Set.of(tp)).get(tp).offset) - - if (consumer.position(tp) == numRecords) { - consumer.seekToBeginning(util.List.of()) - consumed = 0 - } - } - } - } finally { - scheduler.shutdown() - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSeekAndCommitWithBrokerFailures(groupProtocol: String): Unit = seekAndCommitWithBrokerFailures(5) - - def seekAndCommitWithBrokerFailures(numIters: Int): Unit = { - val numRecords = 1000 - val producer = createProducer() - producerSend(producer, numRecords) - - val consumer = createConsumer() - consumer.assign(util.List.of(tp)) - consumer.seek(tp, 0) - - // wait until all the followers have synced the last HW with leader - TestUtils.waitUntilTrue(() => brokerServers.forall(server => - server.replicaManager.localLog(tp).get.highWatermark == numRecords - ), "Failed to update high watermark for followers after timeout") - - val scheduler = new BounceBrokerScheduler(numIters) - try { - scheduler.start() - - while (scheduler.isRunning) { - val coin = TestUtils.random.nextInt(3) - if (coin == 0) { - info("Seeking to end of log") - consumer.seekToEnd(util.List.of()) - assertEquals(numRecords.toLong, consumer.position(tp)) - } else if (coin == 1) { - val pos = TestUtils.random.nextInt(numRecords).toLong - info("Seeking to " + pos) - consumer.seek(tp, pos) - assertEquals(pos, consumer.position(tp)) - } else if (coin == 2) { - info("Committing offset.") - consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(java.util.Set.of(tp)).get(tp).offset) - } - } - } finally { - scheduler.shutdown() - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSubscribeWhenTopicUnavailable(groupProtocol: String): Unit = { - val numRecords = 1000 - val newtopic = "newtopic" - - val consumer = createConsumer() - consumer.subscribe(util.Set.of(newtopic)) - executor.schedule(new Runnable { - def run(): Unit = createTopic(newtopic, numPartitions = brokerCount, replicationFactor = brokerCount) - }, 2, TimeUnit.SECONDS) - consumer.poll(time.Duration.ZERO) - - val producer = createProducer() - - def sendRecords(numRecords: Int, topic: String): Unit = { - var remainingRecords = numRecords - val endTimeMs = System.currentTimeMillis + 20000 - while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) { - val futures = (0 until remainingRecords).map { i => - producer.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) - } - futures.map { future => - try { - future.get - remainingRecords -= 1 - } catch { - case _: Exception => - } - } - } - assertEquals(0, remainingRecords) - } - - val poller = new ConsumerAssignmentPoller(consumer, List(newtopic)) - consumerPollers += poller - poller.start() - sendRecords(numRecords, newtopic) - receiveExactRecords(poller, numRecords, 10000) - poller.shutdown() - - brokerServers.foreach(server => killBroker(server.config.brokerId)) - Thread.sleep(500) - restartDeadBrokers() - - val poller2 = new ConsumerAssignmentPoller(consumer, List(newtopic)) - consumerPollers += poller2 - poller2.start() - sendRecords(numRecords, newtopic) - receiveExactRecords(poller, numRecords, 10000L) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testClose(groupProtocol: String): Unit = { - val numRecords = 10 - val producer = createProducer() - producerSend(producer, numRecords) - - checkCloseGoodPath(numRecords, "group1") - checkCloseWithCoordinatorFailure(numRecords, "group2", "group3") - checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol) - } - - /** - * Consumer is closed while cluster is healthy. Consumer should complete pending offset commits - * and leave group. New consumer instance should be able join group and start consuming from - * last committed offset. - */ - private def checkCloseGoodPath(numRecords: Int, groupId: String): Unit = { - val consumer = createConsumerAndReceive(groupId, manualAssign = false, numRecords) - val future = submitCloseAndValidate(consumer, Long.MaxValue, None, gracefulCloseTimeMs) - future.get - checkClosedState(groupId, numRecords) - } - - /** - * Consumer closed while coordinator is unavailable. Close of consumers using group - * management should complete after commit attempt even though commits fail due to rebalance. - * Close of consumers using manual assignment should complete with successful commits since a - * broker is available. - */ - private def checkCloseWithCoordinatorFailure(numRecords: Int, dynamicGroup: String, manualGroup: String): Unit = { - val consumer1 = createConsumerAndReceive(dynamicGroup, manualAssign = false, numRecords) - val consumer2 = createConsumerAndReceive(manualGroup, manualAssign = true, numRecords) - - killBroker(findCoordinator(dynamicGroup)) - killBroker(findCoordinator(manualGroup)) - - submitCloseAndValidate(consumer1, Long.MaxValue, None, gracefulCloseTimeMs).get - submitCloseAndValidate(consumer2, Long.MaxValue, None, gracefulCloseTimeMs).get - - restartDeadBrokers() - checkClosedState(dynamicGroup, 0) - checkClosedState(manualGroup, numRecords) - } - - private def findCoordinator(group: String): Int = { - val request = new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData() - .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id) - .setCoordinatorKeys(util.List.of(group))).build() - var nodeId = -1 - TestUtils.waitUntilTrue(() => { - val response = connectAndReceive[FindCoordinatorResponse](request) - nodeId = response.node.id - response.error == Errors.NONE - }, s"Failed to find coordinator for group $group") - nodeId - } - - /** - * Consumer is closed while all brokers are unavailable. Cannot rebalance or commit offsets since - * there is no coordinator, but close should timeout and return. If close is invoked with a very - * large timeout, close should timeout after request timeout. - */ - private def checkCloseWithClusterFailure(numRecords: Int, group1: String, group2: String, - groupProtocol: String): Unit = { - val consumer1 = createConsumerAndReceive(group1, manualAssign = false, numRecords) - - val requestTimeout = 6000 - if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) { - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000") - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") - } - this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString) - val consumer2 = createConsumerAndReceive(group2, manualAssign = true, numRecords) - - brokerServers.foreach(server => killBroker(server.config.brokerId)) - val closeTimeout = 2000 - val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, Some(closeTimeout)) - val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(requestTimeout)) - future1.get - future2.get - } - /** * If we have a running consumer group of size N, configure consumer.group.max.size = N-1 and restart all brokers, * the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config. Review Comment: This test is disabled and Im afraid of missing something when migrating so I leave it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org