This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 77580f8 KAFKA-12455: Fix
OffsetValidationTest.test_broker_rolling_bounce failure with Raft (#10322)
77580f8 is described below
commit 77580f8302057baa48d72daf8359f77187af4ac2
Author: Ron Dagostino <[email protected]>
AuthorDate: Tue Mar 16 16:57:29 2021 -0400
KAFKA-12455: Fix OffsetValidationTest.test_broker_rolling_bounce failure
with Raft (#10322)
This test was failing when used with a Raft-based metadata quorum but
succeeding with a
ZooKeeper-based quorum. This patch increases the consumers' session
timeouts to 30 seconds,
which fixes the Raft case and also eliminates flakiness that has
historically existed in the
Zookeeper case.
This patch also fixes a minor logging bug in
RaftReplicaManager.endMetadataChangeDeferral() that
was discovered during the debugging of this issue, and it adds an extra
logging statement in RaftReplicaManager.handleMetadataRecords() when a single
metadata batch is applied to mirror
the same logging statement that occurs when deferred metadata changes are
applied.
In the Raft system test case the consumer was sometimes receiving a
METADATA response with just
1 alive broker, and then when that broker rolled the consumer wouldn't know
about any alive nodes.
It would have to wait until the broker returned before it could reconnect,
and by that time the group
coordinator on the second broker would have timed-out the client and
initiated a group rebalance. The
test explicitly checks that no rebalances occur, so the test would fail. It
turns out that the reason why
the ZooKeeper configuration wasn't seeing rebalances was just plain luck.
The brokers' metadata
caches in the ZooKeeper configuration show 1 alive broker even more
frequently than the Raft
configuration does. If we tweak the metadata.max.age.ms value on the
consumers we can easily
get the ZooKeeper test to fail, and in fact this system test has
historically been flaky for the
ZooKeeper configuration. We can get the test to pass by setting
session.timeout.ms=30000 (which
is longer than the roll time of any broker), or we can increase the broker
count so that the client
never sees a METADATA response with just a single alive broker and
therefore never loses contact
with the cluster for an extended period of time. We have plenty of system
tests with 3+ brokers, so
we choose to keep this test with 2 brokers and increase the session timeout.
Reviewers: Ismael Juma <[email protected]>
---
.../main/scala/kafka/server/RaftReplicaManager.scala | 18 ++++++++----------
tests/kafkatest/tests/client/consumer_test.py | 8 ++++++++
2 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
index 143709d..37dc3e1 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -132,8 +132,8 @@ class RaftReplicaManager(config: KafkaConfig,
def endMetadataChangeDeferral(onLeadershipChange: (Iterable[Partition],
Iterable[Partition]) => Unit): Unit = {
val startMs = time.milliseconds()
- val partitionsMadeFollower = mutable.Set[Partition]()
- val partitionsMadeLeader = mutable.Set[Partition]()
+ var partitionsMadeFollower = Set.empty[Partition]
+ var partitionsMadeLeader = Set.empty[Partition]
replicaStateChangeLock synchronized {
stateChangeLogger.info(s"Applying deferred metadata changes")
val highWatermarkCheckpoints = new
LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
@@ -156,14 +156,10 @@ class RaftReplicaManager(config: KafkaConfig,
}
}
- val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
- delegate.makeLeaders(partitionsAlreadyExisting,
leaderPartitionStates, highWatermarkCheckpoints, None)
- else
- Set.empty[Partition]
- val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
- delegate.makeFollowers(partitionsAlreadyExisting, brokers,
followerPartitionStates, highWatermarkCheckpoints, None)
- else
- Set.empty[Partition]
+ if (leaderPartitionStates.nonEmpty)
+ partitionsMadeLeader =
delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates,
highWatermarkCheckpoints, None)
+ if (followerPartitionStates.nonEmpty)
+ partitionsMadeFollower =
delegate.makeFollowers(partitionsAlreadyExisting, brokers,
followerPartitionStates, highWatermarkCheckpoints, None)
// We need to transition anything that hasn't transitioned from
Deferred to Offline to the Online state.
deferredPartitionsIterator.foreach { deferredPartition =>
@@ -331,6 +327,8 @@ class RaftReplicaManager(config: KafkaConfig,
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+ stateChangeLogger.info(s"Metadata batch $metadataOffset: applied
${partitionsBecomeLeader.size + partitionsBecomeFollower.size} partitions: " +
+ s"${partitionsBecomeLeader.size} leader(s) and
${partitionsBecomeFollower.size} follower(s)")
}
// TODO: we should move aside log directories which have been deleted
rather than
// purging them from the disk immediately.
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index f417480..49e9331 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -93,6 +93,14 @@ class OffsetValidationTest(VerifiableConsumerTest):
partition = TopicPartition(self.TOPIC, 0)
producer = self.setup_producer(self.TOPIC)
+ # The consumers' session timeouts must exceed the time it takes for a
broker to roll. Consumers are likely
+ # to see cluster metadata consisting of just a single alive broker in
the case where the cluster has just 2
+ # brokers and the cluster is rolling (which is what is happening
here). When the consumer sees a single alive
+ # broker, and then that broker rolls, the consumer will be unable to
connect to the cluster until that broker
+ # completes its roll. In the meantime, the consumer group will move
to the group coordinator on the other
+ # broker, and that coordinator will fail the consumer and trigger a
group rebalance if its session times out.
+ # This test is asserting that no rebalances occur, so we increase the
session timeout for this to be the case.
+ self.session_timeout_sec = 30
consumer = self.setup_consumer(self.TOPIC)
producer.start()