This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27647c7c7c7 MINOR: Remove the MetaLogShim namings (#20357)
27647c7c7c7 is described below
commit 27647c7c7c784f70979f6b392fb4ae60ce1c6c46
Author: OuO <[email protected]>
AuthorDate: Sat Aug 16 02:02:56 2025 +0800
MINOR: Remove the MetaLogShim namings (#20357)
Correct parameter name from `logManager` to `raftClient` (leftover from
PR #10705)
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-metadata.xml | 16 ---
.../apache/kafka/controller/QuorumController.java | 6 +-
.../MockRaftClient.java} | 82 ++++++-------
.../MockRaftClientListener.java} | 6 +-
.../MockRaftClientTest.java} | 57 ++++-----
.../MockRaftClientTestEnv.java} | 80 ++++++-------
.../QuorumControllerMetricsIntegrationTest.java | 19 ++-
.../kafka/controller/QuorumControllerTest.java | 132 ++++++++++-----------
.../kafka/controller/QuorumControllerTestEnv.java | 23 ++--
9 files changed, 201 insertions(+), 220 deletions(-)
diff --git a/checkstyle/import-control-metadata.xml
b/checkstyle/import-control-metadata.xml
index c2660674e32..293801bd75f 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -83,7 +83,6 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metadata.authorizer" />
<allow pkg="org.apache.kafka.metadata.migration" />
- <allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
@@ -160,7 +159,6 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
- <allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
@@ -198,18 +196,4 @@
</subpackage>
</subpackage>
- <subpackage name="metalog">
- <allow class="org.apache.kafka.common.compress.Compression"
exact-match="true" />
- <allow pkg="org.apache.kafka.common.metadata" />
- <allow pkg="org.apache.kafka.common.protocol" />
- <allow pkg="org.apache.kafka.common.record" />
- <allow pkg="org.apache.kafka.metadata" />
- <allow pkg="org.apache.kafka.metalog" />
- <allow pkg="org.apache.kafka.raft" />
- <allow pkg="org.apache.kafka.snapshot" />
- <allow pkg="org.apache.kafka.queue" />
- <allow pkg="org.apache.kafka.server.common" />
- <allow pkg="org.apache.kafka.test" />
- </subpackage>
-
</import-control>
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 5f4d6142434..3e1dd69723b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -260,8 +260,8 @@ public final class QuorumController implements Controller {
return this;
}
- public Builder setRaftClient(RaftClient<ApiMessageAndVersion>
logManager) {
- this.raftClient = logManager;
+ public Builder setRaftClient(RaftClient<ApiMessageAndVersion>
raftClient) {
+ this.raftClient = raftClient;
return this;
}
@@ -1082,7 +1082,7 @@ public final class QuorumController implements Controller
{
@Override
public void beginShutdown() {
- queue.beginShutdown("MetaLogManager.Listener");
+ queue.beginShutdown("QuorumMetaLogListener");
}
private void appendRaftEvent(String name, Runnable runnable) {
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
similarity index 91%
rename from metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
rename to metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
index 013d5945d2e..21e3a645530 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.BufferSupplier;
@@ -66,9 +66,9 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * The LocalLogManager is a test implementation that relies on the contents of
memory.
+ * The MockRaftClient is a test implementation that relies on the contents of
memory.
*/
-public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>, AutoCloseable {
+public final class MockRaftClient implements RaftClient<ApiMessageAndVersion>,
AutoCloseable {
interface LocalBatch {
int epoch();
int size();
@@ -158,9 +158,9 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
private static final Logger log =
LoggerFactory.getLogger(SharedLogData.class);
/**
- * Maps node IDs to the matching log managers.
+ * Maps node IDs to the matching raft clients.
*/
- private final HashMap<Integer, LocalLogManager> logManagers = new
HashMap<>();
+ private final HashMap<Integer, MockRaftClient> raftClients = new
HashMap<>();
/**
* Maps offsets to record batches.
@@ -198,17 +198,17 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
}
}
- synchronized void registerLogManager(LocalLogManager logManager) {
- if (logManagers.put(logManager.nodeId, logManager) != null) {
- throw new RuntimeException("Can't have multiple
LocalLogManagers " +
- "with id " + logManager.nodeId());
+ synchronized void registerRaftClient(MockRaftClient raftClient) {
+ if (raftClients.put(raftClient.nodeId, raftClient) != null) {
+ throw new RuntimeException("Can't have multiple
MockRaftClients " +
+ "with id " + raftClient.nodeId());
}
electLeaderIfNeeded();
}
- synchronized void unregisterLogManager(LocalLogManager logManager) {
- if (!logManagers.remove(logManager.nodeId, logManager)) {
- throw new RuntimeException("Log manager " +
logManager.nodeId() +
+ synchronized void unregisterRaftClient(MockRaftClient raftClient) {
+ if (!raftClients.remove(raftClient.nodeId, raftClient)) {
+ throw new RuntimeException("MockRaftClient " +
raftClient.nodeId() +
" was not found.");
}
}
@@ -259,19 +259,19 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
if (batch instanceof LeaderChangeBatch leaderChangeBatch) {
leader = leaderChangeBatch.newLeader;
}
- for (LocalLogManager logManager : logManagers.values()) {
- logManager.scheduleLogCheck();
+ for (MockRaftClient raftClient : raftClients.values()) {
+ raftClient.scheduleLogCheck();
}
prevOffset = nextEndOffset;
return nextEndOffset;
}
synchronized void electLeaderIfNeeded() {
- if (leader.leaderId().isPresent() || logManagers.isEmpty()) {
+ if (leader.leaderId().isPresent() || raftClients.isEmpty()) {
return;
}
- int nextLeaderIndex =
ThreadLocalRandom.current().nextInt(logManagers.size());
- Iterator<Integer> iter = logManagers.keySet().iterator();
+ int nextLeaderIndex =
ThreadLocalRandom.current().nextInt(raftClients.size());
+ Iterator<Integer> iter = raftClients.keySet().iterator();
Integer nextLeaderNode = null;
for (int i = 0; i <= nextLeaderIndex; i++) {
nextLeaderNode = iter.next();
@@ -294,7 +294,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
}
/**
- * Optionally return a snapshot reader if the offset if less than the
first batch.
+ * Optionally return a snapshot reader if the offset is less than the
first batch.
*/
synchronized Optional<RawSnapshotReader> nextSnapshot(long offset) {
return Optional.ofNullable(snapshots.lastEntry()).flatMap(entry ->
{
@@ -325,7 +325,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
/**
* Returns the snapshot id of the latest snapshot if there is one.
*
- * If a snapshot doesn't exists, it return an empty Optional.
+ * If a snapshot doesn't exist, it returns an empty Optional.
*/
synchronized Optional<OffsetAndEpoch> latestSnapshotId() {
return Optional.ofNullable(snapshots.lastEntry()).map(entry ->
entry.getValue().snapshotId());
@@ -400,65 +400,65 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
private final Logger log;
/**
- * The node ID of this local log manager. Each log manager must have a
unique ID.
+ * The node ID of this raft client. Each raft client must have a unique ID.
*/
private final int nodeId;
/**
- * A reference to the in-memory state that unites all the log managers in
use.
+ * A reference to the in-memory state that unites all the raft clients in
use.
*/
private final SharedLogData shared;
/**
- * The event queue used by this local log manager.
+ * The event queue used by this raft client.
*/
private final EventQueue eventQueue;
/**
- * The latest kraft version used by this local log manager.
+ * The latest kraft version used by this raft client.
*/
private KRaftVersion lastKRaftVersion;
/**
- * Whether this LocalLogManager has been shut down.
+ * Whether this raft client has been shut down.
*/
private boolean shutdown = false;
/**
- * An offset that the log manager will not read beyond. This exists only
for testing
+ * An offset that the raft client will not read beyond. This exists only
for testing
* purposes.
*/
private long maxReadOffset;
/**
- * The listener objects attached to this local log manager.
+ * The listener objects attached to this raft client.
*/
private final Map<Listener<ApiMessageAndVersion>, MetaLogListenerData>
listeners = new IdentityHashMap<>();
/**
- * The current leader, as seen by this log manager.
+ * The current leader, as seen by this raft client.
*/
private volatile LeaderAndEpoch leader = new
LeaderAndEpoch(OptionalInt.empty(), 0);
- /*
+ /**
* If this variable is true the next scheduleAppend will fail
*/
private final AtomicBoolean throwOnNextAppend = new AtomicBoolean(false);
- public LocalLogManager(LogContext logContext,
- int nodeId,
- SharedLogData shared,
- String threadNamePrefix,
- KRaftVersion lastKRaftVersion) {
+ public MockRaftClient(LogContext logContext,
+ int nodeId,
+ SharedLogData shared,
+ String threadNamePrefix,
+ KRaftVersion lastKRaftVersion) {
this.logContext = logContext;
- this.log = logContext.logger(LocalLogManager.class);
+ this.log = logContext.logger(MockRaftClient.class);
this.nodeId = nodeId;
this.shared = shared;
this.maxReadOffset = shared.initialMaxReadOffset();
this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext,
threadNamePrefix, new ShutdownEvent());
this.lastKRaftVersion = lastKRaftVersion;
- shared.registerLogManager(this);
+ this.shared.registerRaftClient(this);
}
private void scheduleLogCheck() {
@@ -477,7 +477,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
listenerData.handleLoadSnapshot(
RecordsSnapshotReader.of(
snapshot.get(),
- new MetadataRecordSerde(),
+ new MetadataRecordSerde(),
BufferSupplier.create(),
Integer.MAX_VALUE,
true,
@@ -556,7 +556,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
}
public void beginShutdown() {
- eventQueue.beginShutdown("beginShutdown");
+ eventQueue.beginShutdown("MockKafkaRaftClient");
}
class ShutdownEvent implements EventQueue.Event {
@@ -569,7 +569,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
for (MetaLogListenerData listenerData :
listeners.values()) {
listenerData.beginShutdown();
}
- shared.unregisterLogManager(LocalLogManager.this);
+ shared.unregisterRaftClient(MockRaftClient.this);
}
} catch (Exception e) {
log.error("Unexpected exception while sending beginShutdown
callbacks", e);
@@ -592,7 +592,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
}
/**
- * Shutdown the log manager.
+ * Shutdown the raft client.
*
* Even though the API suggests a non-blocking shutdown, this method
always returns a completed
* future. This means that shutdown is a blocking operation.
@@ -614,7 +614,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
CompletableFuture<Void> future = new CompletableFuture<>();
eventQueue.append(() -> {
if (shutdown) {
- log.info("Node {}: can't register because local log manager
has " +
+ log.info("Node {}: can't register because raft client has " +
"already been shut down.", nodeId);
future.complete(null);
} else {
@@ -643,7 +643,7 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
public void unregister(RaftClient.Listener<ApiMessageAndVersion> listener)
{
eventQueue.append(() -> {
if (shutdown) {
- log.info("Node {}: can't unregister because local log manager
is shutdown", nodeId);
+ log.info("Node {}: can't unregister because raft client is
shutdown", nodeId);
} else {
int id = System.identityHashCode(listener);
if (listeners.remove(listener) == null) {
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
similarity index 95%
rename from
metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
rename to
metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
index 53bc49f8075..b4ee74346ee 100644
---
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.raft.Batch;
@@ -29,7 +29,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
-public class MockMetaLogManagerListener implements
RaftClient.Listener<ApiMessageAndVersion> {
+public class MockRaftClientListener implements
RaftClient.Listener<ApiMessageAndVersion> {
public static final String COMMIT = "COMMIT";
public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
public static final String NEW_LEADER = "NEW_LEADER";
@@ -41,7 +41,7 @@ public class MockMetaLogManagerListener implements
RaftClient.Listener<ApiMessag
private final List<String> serializedEvents = new ArrayList<>();
private LeaderAndEpoch leaderAndEpoch = new
LeaderAndEpoch(OptionalInt.empty(), 0);
- public MockMetaLogManagerListener(int nodeId) {
+ public MockRaftClientListener(int nodeId) {
this.nodeId = nodeId;
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java
similarity index 72%
rename from
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
rename to
metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java
index b42f781e985..4217bd4ed26 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.raft.LeaderAndEpoch;
@@ -28,23 +28,23 @@ import org.junit.jupiter.api.Timeout;
import java.util.List;
import java.util.OptionalInt;
-import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
-import static
org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
-import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
+import static org.apache.kafka.controller.MockRaftClientListener.COMMIT;
+import static
org.apache.kafka.controller.MockRaftClientListener.LAST_COMMITTED_OFFSET;
+import static org.apache.kafka.controller.MockRaftClientListener.SHUTDOWN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@Timeout(value = 40)
-public class LocalLogManagerTest {
+public class MockRaftClientTest {
/**
- * Test creating a LocalLogManager and closing it.
+ * Test creating a MockRaftClient and closing it.
*/
@Test
public void testCreateAndClose() throws Exception {
try (
- LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(1).
buildWithMockListeners()
) {
env.close();
@@ -53,12 +53,12 @@ public class LocalLogManagerTest {
}
/**
- * Test that the local log manager will claim leadership.
+ * Test that the raft client will claim leadership.
*/
@Test
public void testClaimsLeadership() throws Exception {
try (
- LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(1).
buildWithMockListeners()
) {
assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1),
env.waitForLeader());
@@ -68,12 +68,12 @@ public class LocalLogManagerTest {
}
/**
- * Test that we can pass leadership back and forth between log managers.
+ * Test that we can pass leadership back and forth between raft clients.
*/
@Test
public void testPassLeadership() throws Exception {
try (
- LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(3).
buildWithMockListeners()
) {
LeaderAndEpoch first = env.waitForLeader();
@@ -82,7 +82,7 @@ public class LocalLogManagerTest {
int currentLeaderId = cur.leaderId().orElseThrow(() ->
new AssertionError("Current leader is undefined")
);
- env.logManagers().get(currentLeaderId).resign(cur.epoch());
+ env.raftClients().get(currentLeaderId).resign(cur.epoch());
LeaderAndEpoch next = env.waitForLeader();
while (next.epoch() == cur.epoch()) {
@@ -100,10 +100,9 @@ public class LocalLogManagerTest {
}
private static void waitForLastCommittedOffset(long targetOffset,
- LocalLogManager logManager) throws InterruptedException {
+ MockRaftClient raftClient) throws InterruptedException {
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
- MockMetaLogManagerListener listener =
- (MockMetaLogManagerListener) logManager.listeners().get(0);
+ MockRaftClientListener listener = (MockRaftClientListener)
raftClient.listeners().get(0);
long highestOffset = -1;
for (String event : listener.serializedEvents()) {
if (event.startsWith(LAST_COMMITTED_OFFSET)) {
@@ -117,19 +116,19 @@ public class LocalLogManagerTest {
}
}
if (highestOffset < targetOffset) {
- throw new RuntimeException("Offset for log manager " +
- logManager.nodeId() + " only reached " + highestOffset);
+ throw new RuntimeException("Offset for raft client " +
+ raftClient.nodeId() + " only reached " + highestOffset);
}
});
}
/**
- * Test that all the log managers see all the commits.
+ * Test that all the raft clients see all the commits.
*/
@Test
public void testCommits() throws Exception {
try (
- LocalLogManagerTestEnv env = new LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv env = new MockRaftClientTestEnv.Builder(3).
buildWithMockListeners()
) {
LeaderAndEpoch leaderInfo = env.waitForLeader();
@@ -137,22 +136,24 @@ public class LocalLogManagerTest {
new AssertionError("Current leader is undefined")
);
- LocalLogManager activeLogManager = env.logManagers().get(leaderId);
- int epoch = activeLogManager.leaderAndEpoch().epoch();
+ MockRaftClient activeRaftClient = env.raftClients().get(leaderId);
+ int epoch = activeRaftClient.leaderAndEpoch().epoch();
List<ApiMessageAndVersion> messages = List.of(
new ApiMessageAndVersion(new
RegisterBrokerRecord().setBrokerId(0), (short) 0),
new ApiMessageAndVersion(new
RegisterBrokerRecord().setBrokerId(1), (short) 0),
new ApiMessageAndVersion(new
RegisterBrokerRecord().setBrokerId(2), (short) 0));
- assertEquals(3, activeLogManager.prepareAppend(epoch, messages));
- activeLogManager.schedulePreparedAppend();
- for (LocalLogManager logManager : env.logManagers()) {
- waitForLastCommittedOffset(3, logManager);
+ assertEquals(3, activeRaftClient.prepareAppend(epoch, messages));
+
+ activeRaftClient.schedulePreparedAppend();
+ for (MockRaftClient raftClient : env.raftClients()) {
+ waitForLastCommittedOffset(3, raftClient);
}
- List<MockMetaLogManagerListener> listeners =
env.logManagers().stream().
- map(m -> (MockMetaLogManagerListener) m.listeners().get(0)).
+
+ List<MockRaftClientListener> listeners =
env.raftClients().stream().
+ map(m -> (MockRaftClientListener) m.listeners().get(0)).
toList();
env.close();
- for (MockMetaLogManagerListener listener : listeners) {
+ for (MockRaftClientListener listener : listeners) {
List<String> events = listener.serializedEvents();
assertEquals(SHUTDOWN, events.get(events.size() - 1));
int foundIndex = 0;
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java
similarity index 73%
rename from
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
rename to
metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java
index a1e6742f8ef..6d1e741d1d8 100644
---
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientTestEnv.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.kafka.metalog;
+package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.metalog.LocalLogManager.LeaderChangeBatch;
-import org.apache.kafka.metalog.LocalLogManager.LocalRecordBatch;
-import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
+import org.apache.kafka.controller.MockRaftClient.LeaderChangeBatch;
+import org.apache.kafka.controller.MockRaftClient.LocalRecordBatch;
+import org.apache.kafka.controller.MockRaftClient.SharedLogData;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
@@ -41,9 +41,9 @@ import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-public class LocalLogManagerTestEnv implements AutoCloseable {
+public class MockRaftClientTestEnv implements AutoCloseable {
private static final Logger log =
- LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
+ LoggerFactory.getLogger(MockRaftClientTestEnv.class);
private final String clusterId;
@@ -59,23 +59,23 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
private final File dir;
/**
- * The shared data for our LocalLogManager instances.
+ * The shared data for our MockRaftClient instances.
*/
private final SharedLogData shared;
/**
- * A list of log managers.
+ * A list of raft clients.
*/
- private final List<LocalLogManager> logManagers;
+ private final List<MockRaftClient> raftClients;
public static class Builder {
- private final int numManagers;
+ private final int numNodes;
private Optional<RawSnapshotReader> snapshotReader = Optional.empty();
private Consumer<SharedLogData> sharedLogDataInitializer = __ -> { };
private KRaftVersion lastKRaftVersion = KRaftVersion.KRAFT_VERSION_0;
- public Builder(int numManagers) {
- this.numManagers = numManagers;
+ public Builder(int numNodes) {
+ this.numNodes = numNodes;
}
public Builder setSnapshotReader(RawSnapshotReader snapshotReader) {
@@ -96,19 +96,19 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
return this;
}
- public LocalLogManagerTestEnv build() {
- return new LocalLogManagerTestEnv(
- numManagers,
+ public MockRaftClientTestEnv build() {
+ return new MockRaftClientTestEnv(
+ numNodes,
snapshotReader,
sharedLogDataInitializer,
lastKRaftVersion);
}
- public LocalLogManagerTestEnv buildWithMockListeners() {
- LocalLogManagerTestEnv env = build();
+ public MockRaftClientTestEnv buildWithMockListeners() {
+ MockRaftClientTestEnv env = build();
try {
- for (LocalLogManager logManager : env.logManagers) {
- logManager.register(new
MockMetaLogManagerListener(logManager.nodeId().getAsInt()));
+ for (MockRaftClient raftClient : env.raftClients) {
+ raftClient.register(new
MockRaftClientListener(raftClient.nodeId().getAsInt()));
}
} catch (Exception e) {
try {
@@ -122,8 +122,8 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
}
}
- private LocalLogManagerTestEnv(
- int numManagers,
+ private MockRaftClientTestEnv(
+ int numNodes,
Optional<RawSnapshotReader> snapshotReader,
Consumer<SharedLogData> sharedLogDataInitializer,
KRaftVersion lastKRaftVersion
@@ -132,23 +132,23 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
dir = TestUtils.tempDirectory();
shared = new SharedLogData(snapshotReader);
sharedLogDataInitializer.accept(shared);
- List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
+ List<MockRaftClient> newRaftClients = new ArrayList<>(numNodes);
try {
- for (int nodeId = 0; nodeId < numManagers; nodeId++) {
- newLogManagers.add(new LocalLogManager(
- new LogContext(String.format("[LocalLogManager %d] ",
nodeId)),
+ for (int nodeId = 0; nodeId < numNodes; nodeId++) {
+ newRaftClients.add(new MockRaftClient(
+ new LogContext(String.format("[MockRaftClient %d] ",
nodeId)),
nodeId,
shared,
- String.format("LocalLogManager-%d_", nodeId),
+ String.format("MockRaftClient-%d_", nodeId),
lastKRaftVersion));
}
} catch (Throwable t) {
- for (LocalLogManager logManager : newLogManagers) {
- logManager.close();
+ for (MockRaftClient raftClient : newRaftClients) {
+ raftClient.close();
}
throw t;
}
- this.logManagers = newLogManagers;
+ this.raftClients = newRaftClients;
}
/**
@@ -179,9 +179,9 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
LeaderAndEpoch result = null;
- for (LocalLogManager logManager : logManagers) {
- LeaderAndEpoch leader = logManager.leaderAndEpoch();
- int nodeId = logManager.nodeId().getAsInt();
+ for (MockRaftClient raftClient : raftClients) {
+ LeaderAndEpoch leader = raftClient.leaderAndEpoch();
+ int nodeId = raftClient.nodeId().getAsInt();
if (leader.isLeader(nodeId)) {
if (result != null) {
throw new RuntimeException("node " + nodeId +
@@ -198,14 +198,14 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
return value.get();
}
- public List<LocalLogManager> logManagers() {
- return logManagers;
+ public List<MockRaftClient> raftClients() {
+ return raftClients;
}
- public Optional<LocalLogManager> activeLogManager() {
+ public Optional<MockRaftClient> activeRaftClient() {
OptionalInt leader = shared.leaderAndEpoch().leaderId();
if (leader.isPresent()) {
- return Optional.of(logManagers.get(leader.getAsInt()));
+ return Optional.of(raftClients.get(leader.getAsInt()));
} else {
return Optional.empty();
}
@@ -218,11 +218,11 @@ public class LocalLogManagerTestEnv implements
AutoCloseable {
@Override
public void close() throws InterruptedException {
try {
- for (LocalLogManager logManager : logManagers) {
- logManager.beginShutdown();
+ for (MockRaftClient raftClient : raftClients) {
+ raftClient.beginShutdown();
}
- for (LocalLogManager logManager : logManagers) {
- logManager.close();
+ for (MockRaftClient raftClient : raftClients) {
+ raftClient.close();
}
Utils.delete(dir);
} catch (IOException e) {
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
index 7e082ac3033..894a7f739c4 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
-import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -74,9 +73,9 @@ public class QuorumControllerMetricsIntegrationTest {
public void testClosingQuorumControllerClosesMetrics() throws Throwable {
MockControllerMetrics metrics = new MockControllerMetrics();
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setControllerBuilderInitializer(controllerBuilder ->
controllerBuilder.setMetrics(metrics)
).
@@ -97,9 +96,9 @@ public class QuorumControllerMetricsIntegrationTest {
boolean forceFailoverUsingLogLayer
) throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
registerBrokersAndUnfence(controlEnv.activeController(), 1); //
wait for a controller to become active.
@@ -109,7 +108,7 @@ public class QuorumControllerMetricsIntegrationTest {
}
});
if (forceFailoverUsingLogLayer) {
- logEnv.activeLogManager().get().throwOnNextAppend();
+ clientEnv.activeRaftClient().get().throwOnNextAppend();
TestUtils.retryOnExceptionWithTimeout(30_000, () ->
createTopics(controlEnv.activeController(), "test_", 1, 1)
@@ -133,9 +132,9 @@ public class QuorumControllerMetricsIntegrationTest {
@Test
public void testTimeoutMetrics() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
QuorumController active = controlEnv.activeController();
@@ -192,9 +191,9 @@ public class QuorumControllerMetricsIntegrationTest {
@Test
public void testEventQueueOperationsStartedMetric() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
QuorumController active = controlEnv.activeController();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 41ef67ab885..3e58cabeac3 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -99,8 +99,6 @@ import
org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
import
org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.util.BatchFileWriter;
-import org.apache.kafka.metalog.LocalLogManager;
-import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
@@ -180,9 +178,9 @@ public class QuorumControllerTest {
@Test
public void testConfigurationOperations() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
@@ -191,10 +189,10 @@ public class QuorumControllerTest {
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setBrokerId(0).
setLogDirs(List.of(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
- setClusterId(logEnv.clusterId())).get();
+ setClusterId(clientEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@@ -221,9 +219,9 @@ public class QuorumControllerTest {
@Test
public void testDelayedConfigurationOperations() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
@@ -232,18 +230,18 @@ public class QuorumControllerTest {
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setBrokerId(0).
setLogDirs(List.of(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
- setClusterId(logEnv.clusterId())).get();
- testDelayedConfigurationOperations(logEnv,
controlEnv.activeController());
+ setClusterId(clientEnv.clusterId())).get();
+ testDelayedConfigurationOperations(clientEnv,
controlEnv.activeController());
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
private void testDelayedConfigurationOperations(
- LocalLogManagerTestEnv logEnv,
+ MockRaftClientTestEnv clientEnv,
QuorumController controller
) throws Throwable {
- logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
+ clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(1L));
CompletableFuture<Map<ConfigResource, ApiError>> future1 =
controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
BROKER0, Map.of("baz", entry(SET, "123"))), false);
@@ -252,7 +250,7 @@ public class QuorumControllerTest {
new ResultOrError<>(Map.of())),
controller.describeConfigs(ANONYMOUS_CONTEXT, Map.of(
BROKER0, List.of())).get());
- logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L));
+ clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(8L));
assertEquals(Map.of(BROKER0, ApiError.NONE), future1.get());
}
@@ -266,9 +264,9 @@ public class QuorumControllerTest {
long sessionTimeoutMillis = 1000;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
build()
@@ -344,7 +342,7 @@ public class QuorumControllerTest {
// Check that there are imbalaned partitions
assertTrue(active.replicationControl().arePartitionLeadersImbalanced());
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@@ -352,9 +350,9 @@ public class QuorumControllerTest {
public void testElrEnabledByDefault() throws Throwable {
long sessionTimeoutMillis = 500;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
setBootstrapMetadata(BootstrapMetadata.fromRecords(
List.of(
@@ -384,9 +382,9 @@ public class QuorumControllerTest {
long sessionTimeoutMillis = 500;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1,
"test-provided bootstrap ELR enabled")).
build()
@@ -521,9 +519,9 @@ public class QuorumControllerTest {
long sessionTimeoutMillis = 500;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv)
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv)
.setControllerBuilderInitializer(controllerBuilder ->
controllerBuilder.setFenceStaleBrokerIntervalNs(TimeUnit.SECONDS.toNanos(15)))
.setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis))
@@ -618,8 +616,8 @@ public class QuorumControllerTest {
long sessionTimeoutMillis = 300;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).build();
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1,
"test-provided bootstrap ELR enabled")).
build()
@@ -747,9 +745,9 @@ public class QuorumControllerTest {
long leaderImbalanceCheckIntervalNs = 1_000_000_000;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
setLeaderImbalanceCheckIntervalNs(OptionalLong.of(leaderImbalanceCheckIntervalNs)).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
@@ -873,7 +871,7 @@ public class QuorumControllerTest {
"Leaders were not balanced after unfencing all of the brokers"
);
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@@ -886,9 +884,9 @@ public class QuorumControllerTest {
long maxIdleIntervalNs = TimeUnit.MICROSECONDS.toNanos(100);
long maxReplicationDelayMs = 1_000;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setControllerBuilderInitializer(controllerBuilder ->
controllerBuilder.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs))
).
@@ -898,29 +896,29 @@ public class QuorumControllerTest {
listeners.add(new
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
- LocalLogManager localLogManager = logEnv
- .logManagers()
+ MockRaftClient mockRaftClient = clientEnv
+ .raftClients()
.stream()
.filter(logManager ->
logManager.nodeId().equals(OptionalInt.of(active.nodeId())))
.findAny()
.get();
TestUtils.waitForCondition(
- () -> localLogManager.highWatermark().isPresent(),
+ () -> mockRaftClient.highWatermark().isPresent(),
maxReplicationDelayMs,
"High watermark was not established"
);
- final long firstHighWatermark =
localLogManager.highWatermark().getAsLong();
+ final long firstHighWatermark =
mockRaftClient.highWatermark().getAsLong();
TestUtils.waitForCondition(
- () -> localLogManager.highWatermark().getAsLong() >
firstHighWatermark,
+ () -> mockRaftClient.highWatermark().getAsLong() >
firstHighWatermark,
maxReplicationDelayMs,
"Active controller didn't write NoOpRecord the first time"
);
// Do it again to make sure that we are not counting the leader
change record
- final long secondHighWatermark =
localLogManager.highWatermark().getAsLong();
+ final long secondHighWatermark =
mockRaftClient.highWatermark().getAsLong();
TestUtils.waitForCondition(
- () -> localLogManager.highWatermark().getAsLong() >
secondHighWatermark,
+ () -> mockRaftClient.highWatermark().getAsLong() >
secondHighWatermark,
maxReplicationDelayMs,
"Active controller didn't write NoOpRecord the second time"
);
@@ -931,10 +929,10 @@ public class QuorumControllerTest {
@CsvSource(value = {"0, 0", "0, 1", "1, 0", "1, 1"})
public void testRegisterBrokerKRaftVersions(short finalizedKraftVersion,
short brokerMaxSupportedKraftVersion) throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
setLastKRaftVersion(KRaftVersion.fromFeatureLevel(finalizedKraftVersion)).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
build()
) {
@@ -983,9 +981,9 @@ public class QuorumControllerTest {
@Test
public void testUnregisterBroker() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
ListenerCollection listeners = new ListenerCollection();
@@ -1041,7 +1039,7 @@ public class QuorumControllerTest {
});
assertEquals(0, topicPartitionFuture.get().partitionId());
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@@ -1063,14 +1061,14 @@ public class QuorumControllerTest {
Map<Integer, Long> brokerEpochs = new HashMap<>();
Uuid fooId;
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setBootstrapMetadata(SIMPLE_BOOTSTRAP).
build()
) {
QuorumController active = controlEnv.activeController();
- for (int i = 0; i < logEnv.logManagers().size(); i++) {
+ for (int i = 0; i < clientEnv.raftClients().size(); i++) {
active.registerController(ANONYMOUS_CONTEXT,
new ControllerRegistrationRequestData().
setControllerId(i).
@@ -1131,9 +1129,9 @@ public class QuorumControllerTest {
active.allocateProducerIds(ANONYMOUS_CONTEXT,
new
AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
controlEnv.close();
- assertEquals(generateTestRecords(fooId, brokerEpochs),
logEnv.allRecords());
+ assertEquals(generateTestRecords(fooId, brokerEpochs),
clientEnv.allRecords());
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@@ -1269,8 +1267,8 @@ public class QuorumControllerTest {
@Test
public void testTimeouts() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).build();
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
QuorumController controller = controlEnv.activeController();
@@ -1315,7 +1313,7 @@ public class QuorumControllerTest {
assertYieldsTimeout(alterReassignmentsFuture);
assertYieldsTimeout(listReassignmentsFuture);
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@@ -1330,9 +1328,9 @@ public class QuorumControllerTest {
@Test
public void testEarlyControllerResults() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(1).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(1).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
QuorumController controller = controlEnv.activeController();
@@ -1362,16 +1360,16 @@ public class QuorumControllerTest {
alterReassignmentsFuture.get();
countDownLatch.countDown();
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@Test
public void testConfigResourceExistenceChecker() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
QuorumController active = controlEnv.activeController();
@@ -1402,16 +1400,16 @@ public class QuorumControllerTest {
assertThrows(UnknownTopicOrPartitionException.class,
() -> checker.accept(new ConfigResource(TOPIC, "bar")));
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
@Test
public void testFatalMetadataReplayErrorOnActive() throws Throwable {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
build()
) {
QuorumController active = controlEnv.activeController();
@@ -1435,14 +1433,14 @@ public class QuorumControllerTest {
new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
);
- LocalLogManagerTestEnv.Builder logEnvBuilder = new
LocalLogManagerTestEnv.Builder(3)
+ MockRaftClientTestEnv.Builder clientEnvBuilder = new
MockRaftClientTestEnv.Builder(3)
.setSnapshotReader(FileRawSnapshotReader.open(
invalidSnapshot.tempDir.toPath(),
new OffsetAndEpoch(0, 0)
));
- try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) {
- try (QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).build()) {
+ try (MockRaftClientTestEnv clientEnv = clientEnvBuilder.build()) {
+ try (QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).build()) {
TestUtils.waitForCondition(() ->
controlEnv.controllers().stream().allMatch(
controller ->
controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null),
"At least one controller failed to detect the fatal
fault"
@@ -1454,11 +1452,11 @@ public class QuorumControllerTest {
@Test
public void testFatalMetadataErrorDuringLogLoading() throws Exception {
- try (LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).build()) {
- logEnv.appendInitialRecords(List.of(
+ try (MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).build()) {
+ clientEnv.appendInitialRecords(List.of(
new ApiMessageAndVersion(new PartitionRecord(), (short)
0)));
- try (QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).build()) {
+ try (QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).build()) {
TestUtils.waitForCondition(() ->
controlEnv.controllers().stream().allMatch(
controller ->
controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null),
"At least one controller failed to detect the fatal
fault"
@@ -1505,9 +1503,9 @@ public class QuorumControllerTest {
@Test
public void testInsertBootstrapRecordsToEmptyLog() throws Exception {
try (
- LocalLogManagerTestEnv logEnv = new
LocalLogManagerTestEnv.Builder(3).
+ MockRaftClientTestEnv clientEnv = new
MockRaftClientTestEnv.Builder(3).
build();
- QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(logEnv).
+ QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv.Builder(clientEnv).
setBootstrapMetadata(COMPLEX_BOOTSTRAP).
build()
) {
@@ -1538,7 +1536,7 @@ public class QuorumControllerTest {
Map.of("foo", "bar").equals(resultOrError.result());
}, "Failed to see expected config change from bootstrap metadata");
- testToImages(logEnv.allRecords());
+ testToImages(clientEnv.allRecords());
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index b04a3e8c658..e6ececd2ba1 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -19,7 +19,6 @@ package org.apache.kafka.controller;
import org.apache.kafka.metadata.FakeKafkaConfigSchema;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
-import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -41,20 +40,20 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class QuorumControllerTestEnv implements AutoCloseable {
private final List<QuorumController> controllers;
- private final LocalLogManagerTestEnv logEnv;
+ private final MockRaftClientTestEnv clientEnv;
private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new
HashMap<>();
private final Map<Integer, MockFaultHandler> nonFatalFaultHandlers = new
HashMap<>();
public static class Builder {
- private final LocalLogManagerTestEnv logEnv;
+ private final MockRaftClientTestEnv clientEnv;
private Consumer<QuorumController.Builder>
controllerBuilderInitializer = __ -> { };
private OptionalLong sessionTimeoutMillis = OptionalLong.empty();
private OptionalLong leaderImbalanceCheckIntervalNs =
OptionalLong.empty();
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latestTesting(), "test-provided
version");
- public Builder(LocalLogManagerTestEnv logEnv) {
- this.logEnv = logEnv;
+ public Builder(MockRaftClientTestEnv clientEnv) {
+ this.clientEnv = clientEnv;
}
public Builder
setControllerBuilderInitializer(Consumer<QuorumController.Builder>
controllerBuilderInitializer) {
@@ -79,7 +78,7 @@ public class QuorumControllerTestEnv implements AutoCloseable
{
public QuorumControllerTestEnv build() throws Exception {
return new QuorumControllerTestEnv(
- logEnv,
+ clientEnv,
controllerBuilderInitializer,
sessionTimeoutMillis,
leaderImbalanceCheckIntervalNs,
@@ -89,21 +88,21 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
}
private QuorumControllerTestEnv(
- LocalLogManagerTestEnv logEnv,
+ MockRaftClientTestEnv clientEnv,
Consumer<QuorumController.Builder> controllerBuilderInitializer,
OptionalLong sessionTimeoutMillis,
OptionalLong leaderImbalanceCheckIntervalNs,
boolean eligibleLeaderReplicasEnabled,
BootstrapMetadata bootstrapMetadata
) throws Exception {
- this.logEnv = logEnv;
- int numControllers = logEnv.logManagers().size();
+ this.clientEnv = clientEnv;
+ int numControllers = clientEnv.raftClients().size();
this.controllers = new ArrayList<>(numControllers);
try {
List<Integer> nodeIds = IntStream.range(0,
numControllers).boxed().toList();
for (int nodeId = 0; nodeId < numControllers; nodeId++) {
- QuorumController.Builder builder = new
QuorumController.Builder(nodeId, logEnv.clusterId());
- builder.setRaftClient(logEnv.logManagers().get(nodeId));
+ QuorumController.Builder builder = new
QuorumController.Builder(nodeId, clientEnv.clusterId());
+ builder.setRaftClient(clientEnv.raftClients().get(nodeId));
if (eligibleLeaderReplicasEnabled) {
bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(
EligibleLeaderReplicasVersion.FEATURE_NAME,
@@ -140,7 +139,7 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
QuorumController activeController(boolean waitForActivation) throws
InterruptedException {
AtomicReference<QuorumController> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
- LeaderAndEpoch leader = logEnv.leaderAndEpoch();
+ LeaderAndEpoch leader = clientEnv.leaderAndEpoch();
for (QuorumController controller : controllers) {
if
(OptionalInt.of(controller.nodeId()).equals(leader.leaderId()) &&
controller.curClaimEpoch() == leader.epoch()) {