This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 77580f8  KAFKA-12455: Fix 
OffsetValidationTest.test_broker_rolling_bounce failure with Raft (#10322)
77580f8 is described below

commit 77580f8302057baa48d72daf8359f77187af4ac2
Author: Ron Dagostino <[email protected]>
AuthorDate: Tue Mar 16 16:57:29 2021 -0400

    KAFKA-12455: Fix OffsetValidationTest.test_broker_rolling_bounce failure 
with Raft (#10322)
    
    This test was failing when used with a Raft-based metadata quorum but 
succeeding with a
    ZooKeeper-based quorum. This patch increases the consumers' session 
timeouts to 30 seconds,
    which fixes the Raft case and also eliminates flakiness that has 
historically existed in the
    Zookeeper case.
    
    This patch also fixes a minor logging bug in 
RaftReplicaManager.endMetadataChangeDeferral() that
    was discovered during the debugging of this issue, and it adds an extra 
logging statement in RaftReplicaManager.handleMetadataRecords() when a single 
metadata batch is applied to mirror
    the same logging statement that occurs when deferred metadata changes are 
applied.
    
    In the Raft system test case the consumer was sometimes receiving a 
METADATA response with just
    1 alive broker, and then when that broker rolled the consumer wouldn't know 
about any alive nodes.
    It would have to wait until the broker returned before it could reconnect, 
and by that time the group
    coordinator on the second broker would have timed-out the client and 
initiated a group rebalance. The
    test explicitly checks that no rebalances occur, so the test would fail. It 
turns out that the reason why
    the ZooKeeper configuration wasn't seeing rebalances was just plain luck. 
The brokers' metadata
    caches in the ZooKeeper configuration show 1 alive broker even more 
frequently than the Raft
    configuration does. If we tweak the metadata.max.age.ms value on the 
consumers we can easily
    get the ZooKeeper test to fail, and in fact this system test has 
historically been flaky for the
    ZooKeeper configuration. We can get the test to pass by setting 
session.timeout.ms=30000 (which
    is longer than the roll time of any broker), or we can increase the broker 
count so that the client
    never sees a METADATA response with just a single alive broker and 
therefore never loses contact
    with the cluster for an extended period of time. We have plenty of system 
tests with 3+ brokers, so
    we choose to keep this test with 2 brokers and increase the session timeout.
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../main/scala/kafka/server/RaftReplicaManager.scala   | 18 ++++++++----------
 tests/kafkatest/tests/client/consumer_test.py          |  8 ++++++++
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala 
b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
index 143709d..37dc3e1 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -132,8 +132,8 @@ class RaftReplicaManager(config: KafkaConfig,
 
   def endMetadataChangeDeferral(onLeadershipChange: (Iterable[Partition], 
Iterable[Partition]) => Unit): Unit = {
     val startMs = time.milliseconds()
-    val partitionsMadeFollower = mutable.Set[Partition]()
-    val partitionsMadeLeader = mutable.Set[Partition]()
+    var partitionsMadeFollower = Set.empty[Partition]
+    var partitionsMadeLeader = Set.empty[Partition]
     replicaStateChangeLock synchronized {
       stateChangeLogger.info(s"Applying deferred metadata changes")
       val highWatermarkCheckpoints = new 
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
@@ -156,14 +156,10 @@ class RaftReplicaManager(config: KafkaConfig,
           }
         }
 
-        val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
-          delegate.makeLeaders(partitionsAlreadyExisting, 
leaderPartitionStates, highWatermarkCheckpoints, None)
-        else
-          Set.empty[Partition]
-        val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
-          delegate.makeFollowers(partitionsAlreadyExisting, brokers, 
followerPartitionStates, highWatermarkCheckpoints, None)
-        else
-          Set.empty[Partition]
+        if (leaderPartitionStates.nonEmpty)
+          partitionsMadeLeader = 
delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, 
highWatermarkCheckpoints, None)
+        if (followerPartitionStates.nonEmpty)
+          partitionsMadeFollower = 
delegate.makeFollowers(partitionsAlreadyExisting, brokers, 
followerPartitionStates, highWatermarkCheckpoints, None)
 
         // We need to transition anything that hasn't transitioned from 
Deferred to Offline to the Online state.
         deferredPartitionsIterator.foreach { deferredPartition =>
@@ -331,6 +327,8 @@ class RaftReplicaManager(config: KafkaConfig,
         replicaFetcherManager.shutdownIdleFetcherThreads()
         replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
         onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+        stateChangeLogger.info(s"Metadata batch $metadataOffset: applied 
${partitionsBecomeLeader.size + partitionsBecomeFollower.size} partitions: " +
+          s"${partitionsBecomeLeader.size} leader(s) and 
${partitionsBecomeFollower.size} follower(s)")
       }
       // TODO: we should move aside log directories which have been deleted 
rather than
       // purging them from the disk immediately.
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index f417480..49e9331 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -93,6 +93,14 @@ class OffsetValidationTest(VerifiableConsumerTest):
         partition = TopicPartition(self.TOPIC, 0)
 
         producer = self.setup_producer(self.TOPIC)
+        # The consumers' session timeouts must exceed the time it takes for a 
broker to roll.  Consumers are likely
+        # to see cluster metadata consisting of just a single alive broker in 
the case where the cluster has just 2
+        # brokers and the cluster is rolling (which is what is happening 
here).  When the consumer sees a single alive
+        # broker, and then that broker rolls, the consumer will be unable to 
connect to the cluster until that broker
+        # completes its roll.  In the meantime, the consumer group will move 
to the group coordinator on the other
+        # broker, and that coordinator will fail the consumer and trigger a 
group rebalance if its session times out.
+        # This test is asserting that no rebalances occur, so we increase the 
session timeout for this to be the case.
+        self.session_timeout_sec = 30
         consumer = self.setup_consumer(self.TOPIC)
 
         producer.start()

Reply via email to