This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2cd96f0 MINOR: some code cleanups in the controller
2cd96f0 is described below
commit 2cd96f0e64f8a4f4b74e8049a6c527a990cb4777
Author: Colin P. Mccabe <[email protected]>
AuthorDate: Sat Nov 27 16:05:37 2021 -0800
MINOR: some code cleanups in the controller
---
.../kafka/controller/ControllerPurgatory.java | 10 ++++----
.../apache/kafka/controller/QuorumController.java | 29 ++++++++++------------
.../apache/kafka/controller/SnapshotGenerator.java | 4 ---
.../kafka/controller/ControllerPurgatoryTest.java | 10 ++++----
.../kafka/controller/SnapshotGeneratorTest.java | 5 +---
5 files changed, 24 insertions(+), 34 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
b/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
index ee6c1d1..1add108 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ControllerPurgatory.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
-import java.util.Optional;
+import java.util.OptionalLong;
import java.util.TreeMap;
/**
@@ -98,11 +98,11 @@ class ControllerPurgatory {
* Get the offset of the highest pending event, or empty if there are no
pending
* events.
*/
- Optional<Long> highestPendingOffset() {
+ OptionalLong highestPendingOffset() {
if (pending.isEmpty()) {
- return Optional.empty();
+ return OptionalLong.empty();
} else {
- return Optional.of(pending.lastKey());
+ return OptionalLong.of(pending.lastKey());
}
}
-}
\ No newline at end of file
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index cdb35c8..662da8a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -59,7 +59,6 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -322,7 +321,7 @@ public final class QuorumController implements Controller {
}
private Throwable handleEventException(String name,
- Optional<Long>
startProcessingTimeNs,
+ OptionalLong startProcessingTimeNs,
Throwable exception) {
if (!startProcessingTimeNs.isPresent()) {
log.info("unable to start processing {} because of {}.", name,
@@ -334,7 +333,7 @@ public final class QuorumController implements Controller {
}
}
long endProcessingTime = time.nanoseconds();
- long deltaNs = endProcessingTime - startProcessingTimeNs.get();
+ long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
if (exception instanceof ApiException) {
log.info("{}: failed with {} in {} us", name,
@@ -357,7 +356,7 @@ public final class QuorumController implements Controller {
private final String name;
private final Runnable handler;
private final long eventCreatedTimeNs = time.nanoseconds();
- private Optional<Long> startProcessingTimeNs = Optional.empty();
+ private OptionalLong startProcessingTimeNs = OptionalLong.empty();
ControlEvent(String name, Runnable handler) {
this.name = name;
@@ -368,10 +367,10 @@ public final class QuorumController implements Controller
{
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now -
eventCreatedTimeNs));
- startProcessingTimeNs = Optional.of(now);
+ startProcessingTimeNs = OptionalLong.of(now);
log.debug("Executing {}.", this);
handler.run();
- handleEventEnd(this.toString(), startProcessingTimeNs.get());
+ handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
}
@Override
@@ -395,7 +394,6 @@ public final class QuorumController implements Controller {
private static final int MAX_BATCHES_PER_GENERATE_CALL = 10;
class SnapshotGeneratorManager implements Runnable {
- private final ExponentialBackoff exponentialBackoff = new
ExponentialBackoff(10, 2, 5000, 0);
private SnapshotGenerator generator = null;
void createSnapshotGenerator(long committedOffset, int committedEpoch,
long committedTimestamp) {
@@ -420,7 +418,6 @@ public final class QuorumController implements Controller {
logContext,
writer.get(),
MAX_BATCHES_PER_GENERATE_CALL,
- exponentialBackoff,
Arrays.asList(
new Section("features",
featureControl.iterator(committedOffset)),
new Section("cluster",
clusterControl.iterator(committedOffset)),
@@ -504,7 +501,7 @@ public final class QuorumController implements Controller {
private final CompletableFuture<T> future;
private final Supplier<T> handler;
private final long eventCreatedTimeNs = time.nanoseconds();
- private Optional<Long> startProcessingTimeNs = Optional.empty();
+ private OptionalLong startProcessingTimeNs = OptionalLong.empty();
ControllerReadEvent(String name, Supplier<T> handler) {
this.name = name;
@@ -520,9 +517,9 @@ public final class QuorumController implements Controller {
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now -
eventCreatedTimeNs));
- startProcessingTimeNs = Optional.of(now);
+ startProcessingTimeNs = OptionalLong.of(now);
T value = handler.get();
- handleEventEnd(this.toString(), startProcessingTimeNs.get());
+ handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
future.complete(value);
}
@@ -592,7 +589,7 @@ public final class QuorumController implements Controller {
private final CompletableFuture<T> future;
private final ControllerWriteOperation<T> op;
private final long eventCreatedTimeNs = time.nanoseconds();
- private Optional<Long> startProcessingTimeNs = Optional.empty();
+ private OptionalLong startProcessingTimeNs = OptionalLong.empty();
private ControllerResultAndOffset<T> resultAndOffset;
ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
@@ -614,14 +611,14 @@ public final class QuorumController implements Controller
{
if (controllerEpoch == -1) {
throw newNotControllerException();
}
- startProcessingTimeNs = Optional.of(now);
+ startProcessingTimeNs = OptionalLong.of(now);
ControllerResult<T> result = op.generateRecordsAndResult();
if (result.records().isEmpty()) {
op.processBatchEndOffset(writeOffset);
// If the operation did not return any records, then it was
actually just
// a read after all, and not a read + write. However, this
read was done
// from the latest in-memory state, which might contain
uncommitted data.
- Optional<Long> maybeOffset = purgatory.highestPendingOffset();
+ OptionalLong maybeOffset = purgatory.highestPendingOffset();
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending
operations and no
// uncommitted state. We can return immediately.
@@ -633,7 +630,7 @@ public final class QuorumController implements Controller {
}
// If there are operations in the purgatory, we want to wait
for the latest
// one to complete before returning our result to the user.
- resultAndOffset =
ControllerResultAndOffset.of(maybeOffset.get(), result);
+ resultAndOffset =
ControllerResultAndOffset.of(maybeOffset.getAsLong(), result);
log.debug("Read-only operation {} will be completed when the
log " +
"reaches offset {}", this, resultAndOffset.offset());
} else {
@@ -668,7 +665,7 @@ public final class QuorumController implements Controller {
@Override
public void complete(Throwable exception) {
if (exception == null) {
- handleEventEnd(this.toString(), startProcessingTimeNs.get());
+ handleEventEnd(this.toString(),
startProcessingTimeNs.getAsLong());
future.complete(resultAndOffset.response());
} else {
future.completeExceptionally(
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
b/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
index df4bc61..d34696e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
-import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.SnapshotWriter;
@@ -51,7 +50,6 @@ final class SnapshotGenerator {
private final Logger log;
private final SnapshotWriter<ApiMessageAndVersion> writer;
private final int maxBatchesPerGenerateCall;
- private final ExponentialBackoff exponentialBackoff;
private final List<Section> sections;
private final Iterator<Section> sectionIterator;
private Iterator<List<ApiMessageAndVersion>> batchIterator;
@@ -62,12 +60,10 @@ final class SnapshotGenerator {
SnapshotGenerator(LogContext logContext,
SnapshotWriter<ApiMessageAndVersion> writer,
int maxBatchesPerGenerateCall,
- ExponentialBackoff exponentialBackoff,
List<Section> sections) {
this.log = logContext.logger(SnapshotGenerator.class);
this.writer = writer;
this.maxBatchesPerGenerateCall = maxBatchesPerGenerateCall;
- this.exponentialBackoff = exponentialBackoff;
this.sections = sections;
this.sectionIterator = this.sections.iterator();
this.batchIterator = Collections.emptyIterator();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
index 57953e1..6eaf182 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ControllerPurgatoryTest.java
@@ -17,7 +17,7 @@
package org.apache.kafka.controller;
-import java.util.Optional;
+import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Test;
@@ -55,18 +55,18 @@ public class ControllerPurgatoryTest {
SampleDeferredEvent event2 = new SampleDeferredEvent();
SampleDeferredEvent event3 = new SampleDeferredEvent();
purgatory.add(1, event1);
- assertEquals(Optional.of(1L), purgatory.highestPendingOffset());
+ assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
purgatory.add(1, event2);
- assertEquals(Optional.of(1L), purgatory.highestPendingOffset());
+ assertEquals(OptionalLong.of(1L), purgatory.highestPendingOffset());
purgatory.add(3, event3);
- assertEquals(Optional.of(3L), purgatory.highestPendingOffset());
+ assertEquals(OptionalLong.of(3L), purgatory.highestPendingOffset());
purgatory.completeUpTo(2);
assertTrue(event1.future.isDone());
assertTrue(event2.future.isDone());
assertFalse(event3.future.isDone());
purgatory.completeUpTo(4);
assertTrue(event3.future.isDone());
- assertEquals(Optional.empty(), purgatory.highestPendingOffset());
+ assertEquals(OptionalLong.empty(), purgatory.highestPendingOffset());
}
@Test
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
index f6c836c..2c61dbc 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.controller.SnapshotGenerator.Section;
@@ -73,14 +72,12 @@ public class SnapshotGeneratorTest {
@Test
public void testGenerateBatches() throws Exception {
SnapshotWriter<ApiMessageAndVersion> writer =
createSnapshotWriter(123, 0);
- ExponentialBackoff exponentialBackoff =
- new ExponentialBackoff(100, 2, 400, 0.0);
List<Section> sections = Arrays.asList(new Section("replication",
Arrays.asList(BATCHES.get(0), BATCHES.get(1),
BATCHES.get(2)).iterator()),
new Section("configuration",
Arrays.asList(BATCHES.get(3), BATCHES.get(4)).iterator()));
SnapshotGenerator generator = new SnapshotGenerator(new LogContext(),
- writer, 2, exponentialBackoff, sections);
+ writer, 2, sections);
assertFalse(writer.isFrozen());
assertEquals(123L, generator.lastContainedLogOffset());
assertEquals(writer, generator.writer());