This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7dc17908de5 KAFKA-14300; Generate snapshot after repeated controller 
resign (#12747)
7dc17908de5 is described below

commit 7dc17908de540b461b67b71652cef652adc488b0
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Tue Oct 18 15:09:20 2022 -0700

    KAFKA-14300; Generate snapshot after repeated controller resign (#12747)
    
    Setting the `committedBytesSinceLastSnapshot` to 0 when resigning can cause 
the controller to not generate a snapshot after `snapshotMaxNewRecordBytes` 
committed bytes have been replayed.
    
    This change fixes that by simply not resetting the counter during 
resignation. This is correct because the counter tracks the number of committed 
bytes replayed but not included in the latest snapshot. In other words, 
reverting the last committed state does not invalidate this value.
    
    Reviewers: Colin Patrick McCabe <cmcc...@apache.org>
---
 .../apache/kafka/controller/QuorumController.java  | 59 +++++++++++++------
 .../apache/kafka/timeline/SnapshotRegistry.java    |  2 +-
 .../kafka/controller/QuorumControllerTest.java     | 67 ++++++++++++++++++++++
 .../org/apache/kafka/metalog/LocalLogManager.java  |  3 +
 4 files changed, 111 insertions(+), 20 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index d132a4b6be1..9aaea73082c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -980,11 +980,15 @@ public final class QuorumController implements Controller 
{
                                 i++;
                             }
                         }
-                        updateLastCommittedState(offset, epoch, 
batch.appendTimestamp());
-                        processedRecordsSize += batch.sizeInBytes();
+                        updateLastCommittedState(
+                            offset,
+                            epoch,
+                            batch.appendTimestamp(),
+                            committedBytesSinceLastSnapshot + 
batch.sizeInBytes()
+                        );
                     }
 
-                    maybeGenerateSnapshot(processedRecordsSize);
+                    maybeGenerateSnapshot();
                 } finally {
                     reader.close();
                 }
@@ -1039,10 +1043,10 @@ public final class QuorumController implements 
Controller {
                     updateLastCommittedState(
                         reader.lastContainedLogOffset(),
                         reader.lastContainedLogEpoch(),
-                        reader.lastContainedLogTimestamp()
+                        reader.lastContainedLogTimestamp(),
+                        0
                     );
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-                    newBytesSinceLastSnapshot = 0L;
                     authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
                 } finally {
                     reader.close();
@@ -1194,10 +1198,16 @@ public final class QuorumController implements 
Controller {
         }
     }
 
-    private void updateLastCommittedState(long offset, int epoch, long 
timestamp) {
+    private void updateLastCommittedState(
+        long offset,
+        int epoch,
+        long timestamp,
+        long bytesSinceLastSnapshot
+    ) {
         lastCommittedOffset = offset;
         lastCommittedEpoch = epoch;
         lastCommittedTimestamp = timestamp;
+        committedBytesSinceLastSnapshot = bytesSinceLastSnapshot;
 
         controllerMetrics.setLastCommittedRecordOffset(offset);
         if (!isActiveController()) {
@@ -1223,7 +1233,6 @@ public final class QuorumController implements Controller 
{
             }
             snapshotRegistry.revertToSnapshot(lastCommittedOffset);
             authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
-            newBytesSinceLastSnapshot = 0L;
             updateWriteOffset(-1);
             clusterControl.deactivate();
             cancelMaybeFenceReplicas();
@@ -1445,9 +1454,8 @@ public final class QuorumController implements Controller 
{
         }
     }
 
-    private void maybeGenerateSnapshot(long batchSizeInBytes) {
-        newBytesSinceLastSnapshot += batchSizeInBytes;
-        if (newBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
+    private void maybeGenerateSnapshot() {
+        if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
             snapshotGeneratorManager.generator == null
         ) {
             if (!isActiveController()) {
@@ -1457,11 +1465,20 @@ public final class QuorumController implements 
Controller {
                 snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
             }
 
-            log.info("Generating a snapshot that includes (epoch={}, 
offset={}) after {} committed bytes since the last snapshot because, {}.",
-                lastCommittedEpoch, lastCommittedOffset, 
newBytesSinceLastSnapshot, SnapshotReason.MaxBytesExceeded);
+            log.info(
+                "Generating a snapshot that includes (epoch={}, offset={}) 
after {} committed bytes since the last snapshot because, {}.",
+                lastCommittedEpoch,
+                lastCommittedOffset,
+                committedBytesSinceLastSnapshot,
+                SnapshotReason.MaxBytesExceeded
+            );
 
-            
snapshotGeneratorManager.createSnapshotGenerator(lastCommittedOffset, 
lastCommittedEpoch, lastCommittedTimestamp);
-            newBytesSinceLastSnapshot = 0;
+            snapshotGeneratorManager.createSnapshotGenerator(
+                lastCommittedOffset,
+                lastCommittedEpoch,
+                lastCommittedTimestamp
+            );
+            committedBytesSinceLastSnapshot = 0;
         }
     }
 
@@ -1472,8 +1489,7 @@ public final class QuorumController implements Controller 
{
         snapshotGeneratorManager.cancel();
         snapshotRegistry.reset();
 
-        newBytesSinceLastSnapshot = 0;
-        updateLastCommittedState(-1, -1, -1);
+        updateLastCommittedState(-1, -1, -1, 0);
     }
 
     /**
@@ -1646,7 +1662,7 @@ public final class QuorumController implements Controller 
{
     /**
      * Number of bytes processed through handling commits since the last 
snapshot was generated.
      */
-    private long newBytesSinceLastSnapshot = 0;
+    private long committedBytesSinceLastSnapshot = 0;
 
     /**
      * How long to delay partition leader balancing operations.
@@ -2101,8 +2117,13 @@ public final class QuorumController implements 
Controller {
         CompletableFuture<Long> future = new CompletableFuture<>();
         appendControlEvent("beginWritingSnapshot", () -> {
             if (snapshotGeneratorManager.generator == null) {
-                log.info("Generating a snapshot that includes (epoch={}, 
offset={}) after {} committed bytes since the last snapshot because, {}.",
-                    lastCommittedEpoch, lastCommittedOffset, 
newBytesSinceLastSnapshot, SnapshotReason.UnknownReason);
+                log.info(
+                    "Generating a snapshot that includes (epoch={}, offset={}) 
after {} committed bytes since the last snapshot because, {}.",
+                    lastCommittedEpoch,
+                    lastCommittedOffset,
+                    committedBytesSinceLastSnapshot,
+                    SnapshotReason.UnknownReason
+                );
                 snapshotGeneratorManager.createSnapshotGenerator(
                     lastCommittedOffset,
                     lastCommittedEpoch,
diff --git 
a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java 
b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 49d62fd2117..0df9c8b8f56 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -237,7 +237,7 @@ public class SnapshotRegistry {
         } else {
             snapshot.erase();
         }
-        log.debug("Deleting snapshot {}", snapshot.epoch());
+        log.debug("Deleting in-memory snapshot {}", snapshot.epoch());
         snapshots.remove(snapshot.epoch(), snapshot);
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a5e7258a724..90807a2e2a7 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -821,6 +821,73 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testSnapshotAfterRepeatedResign() throws Throwable {
+        final int numBrokers = 4;
+        final int maxNewRecordBytes = 1000;
+        Map<Integer, Long> brokerEpochs = new HashMap<>();
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setConfigSchema(SCHEMA);
+                    
controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
+                }).
+                build();
+        ) {
+            QuorumController active = controlEnv.activeController();
+            for (int i = 0; i < numBrokers; i++) {
+                BrokerRegistrationReply reply = 
active.registerBroker(ANONYMOUS_CONTEXT,
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(i).
+                        setRack(null).
+                        setClusterId(active.clusterId()).
+                        
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_3_IV3)).
+                        
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
+                        setListeners(new ListenerCollection(Arrays.asList(new 
Listener().
+                            setName("PLAINTEXT").setHost("localhost").
+                            setPort(9092 + i)).iterator()))).get();
+                brokerEpochs.put(i, reply.epoch());
+                assertEquals(new BrokerHeartbeatReply(true, false, false, 
false),
+                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new 
BrokerHeartbeatRequestData().
+                        
setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
+                        
setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
+            }
+
+            assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
+                String.format("%s appended bytes is not less than %s max new 
record bytes",
+                    logEnv.appendedBytes(),
+                    maxNewRecordBytes));
+
+            // Keep creating topic and resign leader until we reached the max 
bytes limit
+            int counter = 0;
+            while (logEnv.appendedBytes() < maxNewRecordBytes) {
+                active = controlEnv.activeController();
+
+                counter += 1;
+                String topicName = String.format("foo-%s", counter);
+                active.createTopics(ANONYMOUS_CONTEXT, new 
CreateTopicsRequestData().setTopics(
+                        new CreatableTopicCollection(Collections.singleton(
+                            new 
CreatableTopic().setName(topicName).setNumPartitions(-1).
+                                setReplicationFactor((short) -1).
+                                setAssignments(new 
CreatableReplicaAssignmentCollection(
+                                    Arrays.asList(new 
CreatableReplicaAssignment().
+                                        setPartitionIndex(0).
+                                        setBrokerIds(Arrays.asList(0, 1, 2)),
+                                    new CreatableReplicaAssignment().
+                                        setPartitionIndex(1).
+                                        setBrokerIds(Arrays.asList(1, 2, 0))).
+                                            iterator()))).iterator())),
+                    Collections.singleton(topicName)).get(60, 
TimeUnit.SECONDS);
+
+                LocalLogManager activeLocalLogManager = 
logEnv.logManagers().get(active.nodeId());
+                
activeLocalLogManager.resign(activeLocalLogManager.leaderAndEpoch().epoch());
+            }
+            logEnv.waitForLatestSnapshot();
+        }
+    }
+
     private SnapshotReader<ApiMessageAndVersion> 
createSnapshotReader(RawSnapshotReader reader) {
         return RecordsSnapshotReader.of(
             reader,
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index e24d86bd873..6457ede9ba2 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -406,7 +406,10 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         }
 
         void handleLeaderChange(long offset, LeaderAndEpoch leader) {
+            // Simulate KRaft implementation by first bumping the epoch before 
assigning a leader
+            listener.handleLeaderChange(new 
LeaderAndEpoch(OptionalInt.empty(), leader.epoch()));
             listener.handleLeaderChange(leader);
+
             notifiedLeader = leader;
             this.offset = offset;
         }

Reply via email to