This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81b71c06f3 KAFKA-14204: QuorumController must correctly handle overly 
large batches (#12595)
81b71c06f3 is described below

commit 81b71c06f300daf353cef04653d59385db561b38
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Thu Sep 8 14:21:29 2022 -0700

    KAFKA-14204: QuorumController must correctly handle overly large batches 
(#12595)
    
    Originally, the QuorumController did not try to limit the number of records 
in a batch that it sent
    to the Raft layer.  This caused two problems. Firstly, we were not 
correctly handling the exception
    that was thrown by the Raft layer when a batch of records was too large to 
apply atomically. This
    happened because the Raft layer threw an exception which was a subclass of 
ApiException. Secondly,
    by letting the Raft layer split non-atomic batches, we were not able to 
create snapshots at each of
    the splits. This led to O(N) behavior during controller failovers.
    
    This PR fixes both of these issues by limiting the number of records in a 
batch. Atomic batches
    that are too large will fail with a RuntimeException which will cause the 
active controller to
    become inactive and revert to the last committed state. Non-atomic batches 
will be split into
    multiple batches with a fixed number of records in each.
    
    Reviewers: Luke Chen <show...@gmail.com>, José Armando García Sancio 
<jsan...@gmail.com>
---
 .../apache/kafka/controller/QuorumController.java  | 161 +++++++++++++++------
 .../kafka/controller/QuorumControllerTest.java     |  46 ++++++
 2 files changed, 165 insertions(+), 42 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index e669817dd2..7ad601b007 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -119,6 +119,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -147,6 +148,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+    private final static int MAX_RECORDS_PER_BATCH = 10000;
+
     /**
      * A builder class which creates the QuorumController.
      */
@@ -175,6 +178,7 @@ public final class QuorumController implements Controller {
         private Optional<ClusterMetadataAuthorizer> authorizer = 
Optional.empty();
         private Map<String, Object> staticConfig = Collections.emptyMap();
         private BootstrapMetadata bootstrapMetadata = null;
+        private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
 
         public Builder(int nodeId, String clusterId) {
             this.nodeId = nodeId;
@@ -270,6 +274,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setMaxRecordsPerBatch(int maxRecordsPerBatch) {
+            this.maxRecordsPerBatch = maxRecordsPerBatch;
+            return this;
+        }
+
         public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> 
createTopicPolicy) {
             this.createTopicPolicy = createTopicPolicy;
             return this;
@@ -347,7 +356,8 @@ public final class QuorumController implements Controller {
                     configurationValidator,
                     authorizer,
                     staticConfig,
-                    bootstrapMetadata
+                    bootstrapMetadata,
+                    maxRecordsPerBatch
                 );
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
@@ -776,37 +786,38 @@ public final class QuorumController implements Controller 
{
                         "reaches offset {}", this, resultAndOffset.offset());
                 }
             } else {
-                // Start by trying to apply the record to our in-memory state. 
This should always
-                // succeed; if it does not, that's a fatal error. It is 
important to do this before
-                // scheduling the record for Raft replication.
-                int i = 1;
-                for (ApiMessageAndVersion message : result.records()) {
-                    try {
-                        replay(message.message(), Optional.empty(), 
writeOffset + result.records().size());
-                    } catch (Throwable e) {
-                        String failureMessage = String.format("Unable to apply 
%s record, which was " +
-                            "%d of %d record(s) in the batch following last 
writeOffset %d.",
-                            message.message().getClass().getSimpleName(), i, 
result.records().size(),
-                            writeOffset);
-                        throw fatalFaultHandler.handleFault(failureMessage, e);
-                    }
-                    i++;
-                }
-
-                // If the operation returned a batch of records, and those 
records could be applied,
-                // they need to be written before we can return our result to 
the user.  Here, we
-                // hand off the batch of records to the raft client.  They 
will be written out
-                // asynchronously.
-                final long offset;
-                if (result.isAtomic()) {
-                    offset = raftClient.scheduleAtomicAppend(controllerEpoch, 
result.records());
-                } else {
-                    offset = raftClient.scheduleAppend(controllerEpoch, 
result.records());
-                }
+                // Pass the records to the Raft layer. This will start the 
process of committing
+                // them to the log.
+                long offset = appendRecords(log, result, maxRecordsPerBatch,
+                    new Function<List<ApiMessageAndVersion>, Long>() {
+                        private long prevEndOffset = writeOffset;
+
+                        @Override
+                        public Long apply(List<ApiMessageAndVersion> records) {
+                            // Start by trying to apply the record to our 
in-memory state. This should always
+                            // succeed; if it does not, that's a fatal error. 
It is important to do this before
+                            // scheduling the record for Raft replication.
+                            int i = 1;
+                            for (ApiMessageAndVersion message : records) {
+                                try {
+                                    replay(message.message(), 
Optional.empty(), prevEndOffset + records.size());
+                                } catch (Throwable e) {
+                                    String failureMessage = 
String.format("Unable to apply %s record, which was " +
+                                            "%d of %d record(s) in the batch 
following last write offset %d.",
+                                            
message.message().getClass().getSimpleName(), i, records.size(),
+                                            prevEndOffset);
+                                    throw 
fatalFaultHandler.handleFault(failureMessage, e);
+                                }
+                                i++;
+                            }
+                            prevEndOffset = 
raftClient.scheduleAtomicAppend(controllerEpoch, records);
+                            
snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
+                            return prevEndOffset;
+                        }
+                    });
                 op.processBatchEndOffset(offset);
                 updateWriteOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
-                snapshotRegistry.getOrCreateSnapshot(offset);
 
                 log.debug("Read-write operation {} will be completed when the 
log " +
                     "reaches offset {}.", this, resultAndOffset.offset());
@@ -844,6 +855,72 @@ public final class QuorumController implements Controller {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in 
a batch.
+     * @param appender              The callback to invoke for each batch. The 
arguments are last
+     *                              write offset, record list, and the return 
result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from 
the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it 
is not too large.
+                // In general, we create atomic batches when it is important 
to commit "all, or
+                // nothing". They are limited in size and must only be used 
when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically 
commit " +
+                            records.size() + " records, but maxRecordsPerBatch 
is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with 
offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many 
batches as needed.
+                // The appender callback will create an in-memory snapshot for 
each batch,
+                // since we might need to revert to any of them. We will only 
return the final
+                // offset of the last batch, however.
+                int startIndex = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int endIndex = startIndex + maxRecordsPerBatch;
+                    if (endIndex > records.size()) {
+                        long offset = 
appender.apply(records.subList(startIndex, records.size()));
+                        if (log.isTraceEnabled()) {
+                            log.trace("Appended {} record(s) in {} batch(es), 
ending with offset {}.",
+                                    records.size(), numBatches, offset);
+                        }
+                        return offset;
+                    } else {
+                        appender.apply(records.subList(startIndex, endIndex));
+                    }
+                    startIndex += maxRecordsPerBatch;
+                }
+            }
+        } catch (ApiException e) {
+            // If the Raft client throws a subclass of ApiException, we need 
to convert it into a
+            // RuntimeException so that it will be handled as the unexpected 
exception that it is.
+            // ApiExceptions are reserved for expected errors such as 
incorrect uses of controller
+            // APIs, permission errors, NotControllerException, etc. etc.
+            throw new RuntimeException(e);
+        }
+    }
+
     <T> CompletableFuture<T> appendWriteEvent(String name,
                                               OptionalLong deadlineNs,
                                               ControllerWriteOperation<T> op) {
@@ -1148,19 +1225,12 @@ public final class QuorumController implements 
Controller {
             controllerMetrics.setActive(false);
             purgatory.failAll(newNotControllerException());
 
-            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-                authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
-            } else {
-                log.info("Unable to find last committed offset {} in snapshot 
registry; resetting " +
-                         "to empty state.", lastCommittedOffset);
-                resetToEmptyState();
-                authorizer.ifPresent(a -> 
a.loadSnapshot(Collections.emptyMap()));
-                needToCompleteAuthorizerLoad = authorizer.isPresent();
-                raftClient.unregister(metaLogListener);
-                metaLogListener = new QuorumMetaLogListener();
-                raftClient.register(metaLogListener);
+            if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                throw new RuntimeException("Unable to find last committed 
offset " +
+                        lastCommittedEpoch + " in snapshot registry.");
             }
+            snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+            authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
             newBytesSinceLastSnapshot = 0L;
             updateWriteOffset(-1);
             clusterControl.deactivate();
@@ -1625,6 +1695,11 @@ public final class QuorumController implements 
Controller {
      */
     private final BootstrapMetadata bootstrapMetadata;
 
+    /**
+     * The maximum number of records per batch to allow.
+     */
+    private final int maxRecordsPerBatch;
+
     private QuorumController(
         FaultHandler fatalFaultHandler,
         FaultHandler metadataFaultHandler,
@@ -1649,7 +1724,8 @@ public final class QuorumController implements Controller 
{
         ConfigurationValidator configurationValidator,
         Optional<ClusterMetadataAuthorizer> authorizer,
         Map<String, Object> staticConfig,
-        BootstrapMetadata bootstrapMetadata
+        BootstrapMetadata bootstrapMetadata,
+        int maxRecordsPerBatch
     ) {
         this.fatalFaultHandler = fatalFaultHandler;
         this.metadataFaultHandler = metadataFaultHandler;
@@ -1719,6 +1795,7 @@ public final class QuorumController implements Controller 
{
                 build();
         this.raftClient = raftClient;
         this.bootstrapMetadata = bootstrapMetadata;
+        this.maxRecordsPerBatch = maxRecordsPerBatch;
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
         this.needToCompleteAuthorizerLoad = authorizer.isPresent();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 89ef0b083d..a1a3741b6a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
@@ -47,6 +48,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -109,6 +111,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.function.Function.identity;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -129,6 +133,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @Timeout(value = 40)
 public class QuorumControllerTest {
+    private final static Logger log = 
LoggerFactory.getLogger(QuorumControllerTest.class);
+
     static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
             fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided 
bootstrap");
 
@@ -1373,4 +1379,44 @@ public class QuorumControllerTest {
             }
         }
     }
+
+    static class TestAppender implements Function<List<ApiMessageAndVersion>, 
Long>  {
+        private long offset = 0;
+
+        @Override
+        public Long apply(List<ApiMessageAndVersion> apiMessageAndVersions) {
+            for (ApiMessageAndVersion apiMessageAndVersion : 
apiMessageAndVersions) {
+                BrokerRegistrationChangeRecord record =
+                        (BrokerRegistrationChangeRecord) 
apiMessageAndVersion.message();
+                assertEquals((int) offset, record.brokerId());
+                offset++;
+            }
+            return offset;
+        }
+    }
+
+    private static ApiMessageAndVersion rec(int i) {
+        return new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().setBrokerId(i),
+                (short) 0);
+    }
+
+    @Test
+    public void testAppendRecords() {
+        TestAppender appender = new TestAppender();
+        assertEquals(5, QuorumController.appendRecords(log,
+            ControllerResult.of(Arrays.asList(rec(0), rec(1), rec(2), rec(3), 
rec(4)), null),
+            2,
+            appender));
+    }
+
+    @Test
+    public void testAppendRecordsAtomically() {
+        TestAppender appender = new TestAppender();
+        assertEquals("Attempted to atomically commit 5 records, but 
maxRecordsPerBatch is 2",
+            assertThrows(IllegalStateException.class, () ->
+                QuorumController.appendRecords(log,
+                        ControllerResult.atomicOf(Arrays.asList(rec(0), 
rec(1), rec(2), rec(3), rec(4)), null),
+                        2,
+                        appender)).getMessage());
+    }
 }

Reply via email to