This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 282d7a1b9 RATIS-2290. Simply the EventQueue in leader (#1258)
282d7a1b9 is described below
commit 282d7a1b9b09a072c80163f92c197c82de0ae7f8
Author: jianghuazhu <[email protected]>
AuthorDate: Thu May 15 02:30:01 2025 +0800
RATIS-2290. Simply the EventQueue in leader (#1258)
---
.../apache/ratis/server/impl/LeaderStateImpl.java | 37 ++++++++++------------
1 file changed, 16 insertions(+), 21 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index dd4b9dc1a..714188896 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -112,12 +112,10 @@ class LeaderStateImpl implements LeaderState {
}
private final Type type;
- private final long newTerm;
private final Runnable handler;
- StateUpdateEvent(Type type, long newTerm, Runnable handler) {
+ StateUpdateEvent(Type type, Runnable handler) {
this.type = type;
- this.newTerm = newTerm;
this.handler = handler;
}
@@ -133,25 +131,30 @@ class LeaderStateImpl implements LeaderState {
return false;
}
final StateUpdateEvent that = (StateUpdateEvent)obj;
- return this.type == that.type && this.newTerm == that.newTerm;
+ return this.type == that.type;
}
@Override
public int hashCode() {
- return Objects.hash(type, newTerm);
+ return type.hashCode();
}
@Override
public String toString() {
- return type + (newTerm >= 0? ":" + newTerm: "");
+ return type.name();
}
}
private class EventQueue {
private final String name =
ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
- private final BlockingQueue<StateUpdateEvent> queue = new
ArrayBlockingQueue<>(4096);
+ private final BlockingQueue<StateUpdateEvent> queue = new
ArrayBlockingQueue<>(
+ StateUpdateEvent.Type.values().length);;
- void submit(StateUpdateEvent event) {
+ // submit can be invoked by different threads -- need to be synchronized
+ synchronized void submit(StateUpdateEvent event) {
+ if (queue.contains(event)) { // avoid duplicated events
+ return;
+ }
try {
queue.put(event);
} catch (InterruptedException e) {
@@ -160,10 +163,10 @@ class LeaderStateImpl implements LeaderState {
}
}
+ // poll is invoked only by the EventProcessor thread -- synchronized is
not needed
StateUpdateEvent poll() {
- final StateUpdateEvent e;
try {
- e = queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
+ return queue.poll(server.getMaxTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
String s = this + ": poll() is interrupted";
@@ -174,14 +177,6 @@ class LeaderStateImpl implements LeaderState {
throw new IllegalStateException(s + " UNEXPECTEDLY", ie);
}
}
-
- if (e != null) {
- // remove duplicated events from the head.
- while(e.equals(queue.peek())) {
- queue.poll();
- }
- }
- return e;
}
@Override
@@ -323,9 +318,9 @@ class LeaderStateImpl implements LeaderState {
}
private final StateUpdateEvent updateCommitEvent =
- new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1,
this::updateCommit);
+ new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT,
this::updateCommit);
private final StateUpdateEvent checkStagingEvent =
- new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING, -1,
this::checkStaging);
+ new StateUpdateEvent(StateUpdateEvent.Type.CHECK_STAGING,
this::checkStaging);
private final String name;
private final RaftServerImpl server;
@@ -704,7 +699,7 @@ class LeaderStateImpl implements LeaderState {
}
void submitStepDownEvent(long term, StepDownReason reason) {
- eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN,
term, () -> stepDown(term, reason)));
+ eventQueue.submit(new StateUpdateEvent(StateUpdateEvent.Type.STEP_DOWN, ()
-> stepDown(term, reason)));
}
private void stepDown(long term, StepDownReason reason) {