hachikuji commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r686135001
##########
File path:
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
##########
@@ -419,6 +419,27 @@ long nextCheckTimeNs() {
}
}
+ /**
+ * Check if the oldest broker to have hearbeated has already violated the
+ * sessionTimeoutNs timeout and needs to be fenced.
+ *
+ * @return An Optional broker node id.
+ */
+ Optional<Integer> findOneStaleBroker() {
+ Optional<Integer> node = Optional.empty();
+ BrokerHeartbeatStateIterator iterator = unfenced.iterator();
+ if (iterator.hasNext()) {
+ BrokerHeartbeatState broker = iterator.next();
+ // The unfenced broker list is sorted on last contact time from
each
+ // broker. If the first broker has a valid session then all do
+ if (!hasValidSession(broker)) {
+ node = Optional.of(broker.id);
Review comment:
nit: you can return here and then we don't need `node`
##########
File path:
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -925,18 +925,27 @@ ApiError electLeader(String topic, int partitionId,
boolean uncleanOk,
return ControllerResult.of(records, null);
}
- ControllerResult<Void> maybeFenceStaleBrokers() {
+ ControllerResult<Void> maybeFenceOneStaleBroker() {
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerHeartbeatManager heartbeatManager =
clusterControl.heartbeatManager();
- List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
- for (int brokerId : staleBrokers) {
+ Optional<Integer> staleBroker = heartbeatManager.findOneStaleBroker();
+ if (staleBroker.isPresent()) {
Review comment:
nit: maybe a little more concise with a lambda?
```java
heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
// Even though multiple brokers can go stale at a time, we will process
// fencing one at a time so that the effect of fencing each broker is
visible
// to the system prior to processing the next one
log.info("Fencing broker {} because its session has timed out.", brokerId);
handleBrokerFenced(brokerId, records);
heartbeatManager.fence(brokerId);
});
```
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -173,6 +176,112 @@ private void
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
future1.get());
}
+ @Test
+ public void testFenceMultipleBrokers() throws Throwable {
Review comment:
This test times out for me when run locally. I'm a little concerned that
the dependence on real time here will make it flaky. I'm satisfied with the
other tests we have in this patch. Would it be reasonable to push this test
case to a follow-up so that we can iterate on it a little bit without blocking
the patch?
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
##########
@@ -37,7 +42,13 @@
private final LocalLogManagerTestEnv logEnv;
public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
- Consumer<QuorumController.Builder>
builderConsumer)
+ Consumer<QuorumController.Builder> builderConsumer)
Review comment:
nit: the indentation is a bit screwy here and below
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -1029,6 +1051,22 @@ public void
testCreatePartitionsFailsWithManualAssignmentWithAllFenced() throws
ctx.replicationControl.getPartition(fooId, 1));
}
+ @Test
+ public void testFenceMultipleBrokers() throws Exception {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3, 4);
+ ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+ Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+ new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2,
1}}).topicId();
+
+ ctx.fenceBrokers(Utils.mkSet(2, 3));
+
+ PartitionRegistration partition0 = replication.getPartition(fooId, 0);
+ assertArrayEquals(new int[]{1}, partition0.isr);
Review comment:
Can we extend this test to assert something about the other partitions?
Maybe also useful to assert the current leader?
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
##########
@@ -93,22 +93,26 @@ public void testFindStaleBrokers() {
assertEquals(1, iter.next().id());
assertEquals(2, iter.next().id());
assertFalse(iter.hasNext());
- assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+ assertEquals(Optional.empty(), manager.findOneStaleBroker());
time.sleep(5);
- assertEquals(Collections.singletonList(0), manager.findStaleBrokers());
+ assertEquals(Optional.of(0), manager.findOneStaleBroker());
manager.fence(0);
- assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+ assertEquals(Optional.empty(), manager.findOneStaleBroker());
iter = manager.unfenced().iterator();
assertEquals(1, iter.next().id());
assertEquals(2, iter.next().id());
assertFalse(iter.hasNext());
time.sleep(20);
- assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers());
- manager.fence(1);
- manager.fence(2);
- assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+ Integer nodeId = 1;
+ while (manager.findOneStaleBroker().isPresent()) {
Review comment:
nit: maybe it's better to unroll this to make the expectation explicit.
This test would pass even if there were additional iterations of the loop.
```java
assertEquals(Optional.of(1), manager.findStaleBroker());
manager.fence(1);
assertEquals(Optional.of(2), manager.findStaleBroker());
manager.fence(2);
```
Ditto below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]