This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2cd96f0  MINOR: some code cleanups in the controller
2cd96f0 is described below

commit 2cd96f0e64f8a4f4b74e8049a6c527a990cb4777
Author: Colin P. Mccabe <[email protected]>
AuthorDate: Sat Nov 27 16:05:37 2021 -0800

    MINOR: some code cleanups in the controller
---
 .../kafka/controller/ControllerPurgatory.java      | 10 ++++----
 .../apache/kafka/controller/QuorumController.java  | 29 ++++++++++------------
 .../apache/kafka/controller/SnapshotGenerator.java |  4 ---
 .../kafka/controller/ControllerPurgatoryTest.java  | 10 ++++----
 .../kafka/controller/SnapshotGeneratorTest.java    |  5 +---
 5 files changed, 24 insertions(+), 34 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
index ee6c1d1..1add108 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
-import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.TreeMap;
 
 /**
@@ -98,11 +98,11 @@ class ControllerPurgatory {
      * Get the offset of the highest pending event, or empty if there are no 
pending
      * events.
      */
-    Optional<Long> highestPendingOffset() {
+    OptionalLong highestPendingOffset() {
         if (pending.isEmpty()) {
-            return Optional.empty();
+            return OptionalLong.empty();
         } else {
-            return Optional.of(pending.lastKey());
+            return OptionalLong.of(pending.lastKey());
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index cdb35c8..662da8a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -59,7 +59,6 @@ import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -322,7 +321,7 @@ public final class QuorumController implements Controller {
     }
 
     private Throwable handleEventException(String name,
-                                           Optional<Long> 
startProcessingTimeNs,
+                                           OptionalLong startProcessingTimeNs,
                                            Throwable exception) {
         if (!startProcessingTimeNs.isPresent()) {
             log.info("unable to start processing {} because of {}.", name,
@@ -334,7 +333,7 @@ public final class QuorumController implements Controller {
             }
         }
         long endProcessingTime = time.nanoseconds();
-        long deltaNs = endProcessingTime - startProcessingTimeNs.get();
+        long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
         long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
         if (exception instanceof ApiException) {
             log.info("{}: failed with {} in {} us", name,
@@ -357,7 +356,7 @@ public final class QuorumController implements Controller {
         private final String name;
         private final Runnable handler;
         private final long eventCreatedTimeNs = time.nanoseconds();
-        private Optional<Long> startProcessingTimeNs = Optional.empty();
+        private OptionalLong startProcessingTimeNs = OptionalLong.empty();
 
         ControlEvent(String name, Runnable handler) {
             this.name = name;
@@ -368,10 +367,10 @@ public final class QuorumController implements Controller 
{
         public void run() throws Exception {
             long now = time.nanoseconds();
             controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            startProcessingTimeNs = Optional.of(now);
+            startProcessingTimeNs = OptionalLong.of(now);
             log.debug("Executing {}.", this);
             handler.run();
-            handleEventEnd(this.toString(), startProcessingTimeNs.get());
+            handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
         }
 
         @Override
@@ -395,7 +394,6 @@ public final class QuorumController implements Controller {
     private static final int MAX_BATCHES_PER_GENERATE_CALL = 10;
 
     class SnapshotGeneratorManager implements Runnable {
-        private final ExponentialBackoff exponentialBackoff = new 
ExponentialBackoff(10, 2, 5000, 0);
         private SnapshotGenerator generator = null;
 
         void createSnapshotGenerator(long committedOffset, int committedEpoch, 
long committedTimestamp) {
@@ -420,7 +418,6 @@ public final class QuorumController implements Controller {
                     logContext,
                     writer.get(),
                     MAX_BATCHES_PER_GENERATE_CALL,
-                    exponentialBackoff,
                     Arrays.asList(
                         new Section("features", 
featureControl.iterator(committedOffset)),
                         new Section("cluster", 
clusterControl.iterator(committedOffset)),
@@ -504,7 +501,7 @@ public final class QuorumController implements Controller {
         private final CompletableFuture<T> future;
         private final Supplier<T> handler;
         private final long eventCreatedTimeNs = time.nanoseconds();
-        private Optional<Long> startProcessingTimeNs = Optional.empty();
+        private OptionalLong startProcessingTimeNs = OptionalLong.empty();
 
         ControllerReadEvent(String name, Supplier<T> handler) {
             this.name = name;
@@ -520,9 +517,9 @@ public final class QuorumController implements Controller {
         public void run() throws Exception {
             long now = time.nanoseconds();
             controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            startProcessingTimeNs = Optional.of(now);
+            startProcessingTimeNs = OptionalLong.of(now);
             T value = handler.get();
-            handleEventEnd(this.toString(), startProcessingTimeNs.get());
+            handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
             future.complete(value);
         }
 
@@ -592,7 +589,7 @@ public final class QuorumController implements Controller {
         private final CompletableFuture<T> future;
         private final ControllerWriteOperation<T> op;
         private final long eventCreatedTimeNs = time.nanoseconds();
-        private Optional<Long> startProcessingTimeNs = Optional.empty();
+        private OptionalLong startProcessingTimeNs = OptionalLong.empty();
         private ControllerResultAndOffset<T> resultAndOffset;
 
         ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
@@ -614,14 +611,14 @@ public final class QuorumController implements Controller 
{
             if (controllerEpoch == -1) {
                 throw newNotControllerException();
             }
-            startProcessingTimeNs = Optional.of(now);
+            startProcessingTimeNs = OptionalLong.of(now);
             ControllerResult<T> result = op.generateRecordsAndResult();
             if (result.records().isEmpty()) {
                 op.processBatchEndOffset(writeOffset);
                 // If the operation did not return any records, then it was 
actually just
                 // a read after all, and not a read + write.  However, this 
read was done
                 // from the latest in-memory state, which might contain 
uncommitted data.
-                Optional<Long> maybeOffset = purgatory.highestPendingOffset();
+                OptionalLong maybeOffset = purgatory.highestPendingOffset();
                 if (!maybeOffset.isPresent()) {
                     // If the purgatory is empty, there are no pending 
operations and no
                     // uncommitted state.  We can return immediately.
@@ -633,7 +630,7 @@ public final class QuorumController implements Controller {
                 }
                 // If there are operations in the purgatory, we want to wait 
for the latest
                 // one to complete before returning our result to the user.
-                resultAndOffset = 
ControllerResultAndOffset.of(maybeOffset.get(), result);
+                resultAndOffset = 
ControllerResultAndOffset.of(maybeOffset.getAsLong(), result);
                 log.debug("Read-only operation {} will be completed when the 
log " +
                     "reaches offset {}", this, resultAndOffset.offset());
             } else {
@@ -668,7 +665,7 @@ public final class QuorumController implements Controller {
         @Override
         public void complete(Throwable exception) {
             if (exception == null) {
-                handleEventEnd(this.toString(), startProcessingTimeNs.get());
+                handleEventEnd(this.toString(), 
startProcessingTimeNs.getAsLong());
                 future.complete(resultAndOffset.response());
             } else {
                 future.completeExceptionally(
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java 
b/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
index df4bc61..d34696e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.OptionalLong;
 
-import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.snapshot.SnapshotWriter;
@@ -51,7 +50,6 @@ final class SnapshotGenerator {
     private final Logger log;
     private final SnapshotWriter<ApiMessageAndVersion> writer;
     private final int maxBatchesPerGenerateCall;
-    private final ExponentialBackoff exponentialBackoff;
     private final List<Section> sections;
     private final Iterator<Section> sectionIterator;
     private Iterator<List<ApiMessageAndVersion>> batchIterator;
@@ -62,12 +60,10 @@ final class SnapshotGenerator {
     SnapshotGenerator(LogContext logContext,
                       SnapshotWriter<ApiMessageAndVersion> writer,
                       int maxBatchesPerGenerateCall,
-                      ExponentialBackoff exponentialBackoff,
                       List<Section> sections) {
         this.log = logContext.logger(SnapshotGenerator.class);
         this.writer = writer;
         this.maxBatchesPerGenerateCall = maxBatchesPerGenerateCall;
-        this.exponentialBackoff = exponentialBackoff;
         this.sections = sections;
         this.sectionIterator = this.sections.iterator();
         this.batchIterator = Collections.emptyIterator();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
index 57953e1..6eaf182 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.controller;
 
-import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.junit.jupiter.api.Test;
@@ -55,18 +55,18 @@ public class ControllerPurgatoryTest {
         SampleDeferredEvent event2 = new SampleDeferredEvent();
         SampleDeferredEvent event3 = new SampleDeferredEvent();
         purgatory.add(1, event1);
-        assertEquals(Optional.of(1L), purgatory.highestPendingOffset());
+        assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
         purgatory.add(1, event2);
-        assertEquals(Optional.of(1L), purgatory.highestPendingOffset());
+        assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
         purgatory.add(3, event3);
-        assertEquals(Optional.of(3L), purgatory.highestPendingOffset());
+        assertEquals(OptionalLong.of(3L), purgatory.highestPendingOffset());
         purgatory.completeUpTo(2);
         assertTrue(event1.future.isDone());
         assertTrue(event2.future.isDone());
         assertFalse(event3.future.isDone());
         purgatory.completeUpTo(4);
         assertTrue(event3.future.isDone());
-        assertEquals(Optional.empty(), purgatory.highestPendingOffset());
+        assertEquals(OptionalLong.empty(), purgatory.highestPendingOffset());
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
index f6c836c..2c61dbc 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.controller.SnapshotGenerator.Section;
@@ -73,14 +72,12 @@ public class SnapshotGeneratorTest {
     @Test
     public void testGenerateBatches() throws Exception {
         SnapshotWriter<ApiMessageAndVersion> writer = 
createSnapshotWriter(123, 0);
-        ExponentialBackoff exponentialBackoff =
-            new ExponentialBackoff(100, 2, 400, 0.0);
         List<Section> sections = Arrays.asList(new Section("replication",
                 Arrays.asList(BATCHES.get(0), BATCHES.get(1), 
BATCHES.get(2)).iterator()),
             new Section("configuration",
                 Arrays.asList(BATCHES.get(3), BATCHES.get(4)).iterator()));
         SnapshotGenerator generator = new SnapshotGenerator(new LogContext(),
-            writer, 2, exponentialBackoff, sections);
+            writer, 2, sections);
         assertFalse(writer.isFrozen());
         assertEquals(123L, generator.lastContainedLogOffset());
         assertEquals(writer, generator.writer());

Reply via email to