This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d5cd3f6 RATIS-542. When a server restarts, purge(-1) causes
IllegalStateException.
d5cd3f6 is described below
commit d5cd3f6c69082f42cb1a36d2c9a5a27f1d99a836
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Thu May 16 14:02:45 2019 -0700
RATIS-542. When a server restarts, purge(-1) causes IllegalStateException.
---
.../org/apache/ratis/server/impl/ServerState.java | 1 -
.../ratis/server/impl/StateMachineUpdater.java | 191 ++++++++++++---------
.../org/apache/ratis/server/raftlog/RaftLog.java | 1 +
.../apache/ratis/server/raftlog/RaftLogIndex.java | 7 +
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 6 -
.../apache/ratis/server/storage/RaftStorage.java | 20 +--
6 files changed, 117 insertions(+), 109 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 7827ea5..0c49716 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -158,7 +158,6 @@ public class ServerState implements Closeable {
private long initStatemachine(StateMachine sm, RaftGroupId gid)
throws IOException {
sm.initialize(server.getProxy(), gid, storage);
- storage.setStateMachineStorage(sm.getStateMachineStorage());
SnapshotInfo snapshot = sm.getLatestSnapshot();
if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 97fe4e4..d28c7ab 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,6 +22,8 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.*;
@@ -31,7 +33,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
/**
* This class tracks the log entries that have been committed in a quorum and
@@ -44,38 +50,42 @@ import java.util.concurrent.CompletableFuture;
*/
class StateMachineUpdater implements Runnable {
static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
- private volatile Long stopIndex = null;
enum State {
RUNNING, STOP, RELOAD
}
- private final RaftProperties properties;
+ private final Consumer<Object> infoIndexChange;
+ private final Consumer<Object> debugIndexChange;
+ private final String name;
+
private final StateMachine stateMachine;
private final RaftServerImpl server;
private final RaftLog raftLog;
- private volatile long lastAppliedIndex;
-
- private final boolean autoSnapshotEnabled;
- private final long autoSnapshotThreshold;
- private long lastSnapshotIndex;
+ private final Long autoSnapshotThreshold;
private final Thread updater;
+ private final RaftLogIndex appliedIndex;
+ private final RaftLogIndex snapshotIndex;
+ private final AtomicReference<Long> stopIndex = new AtomicReference<>();
private volatile State state = State.RUNNING;
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
- this.properties = properties;
+ this.name = getClass().getSimpleName() + ":" + raftLog.getSelfId() + ":" +
server.getGroupId();
+ this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
+ this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
+
this.stateMachine = stateMachine;
this.server = server;
this.raftLog = raftLog;
- this.lastAppliedIndex = lastAppliedIndex;
- lastSnapshotIndex = lastAppliedIndex;
+ this.appliedIndex = new RaftLogIndex("appliedIndex", lastAppliedIndex);
+ this.snapshotIndex = new RaftLogIndex("snapshotIndex", lastAppliedIndex);
- autoSnapshotEnabled =
RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties);
- autoSnapshotThreshold =
RaftServerConfigKeys.Snapshot.autoTriggerThreshold(properties);
+ final boolean autoSnapshot =
RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties);
+ this.autoSnapshotThreshold = autoSnapshot?
RaftServerConfigKeys.Snapshot.autoTriggerThreshold(properties): null;
updater = new Daemon(this);
}
@@ -87,25 +97,18 @@ class StateMachineUpdater implements Runnable {
state = State.STOP;
try {
stateMachine.close();
- } catch (IOException ignored) {
- LOG.warn(server.getId() + ": Failed to close "
- + stateMachine.getClass().getSimpleName()
- + " " + stateMachine, ignored);
+ } catch(Throwable t) {
+ LOG.warn(name + ": Failed to close " +
stateMachine.getClass().getSimpleName() + " " + stateMachine, t);
}
}
/**
* Stop the updater thread after all the committed transactions
* have been applied to the state machine.
- *
- * @throws InterruptedException
*/
void stopAndJoin() throws InterruptedException {
- if (stopIndex == null) {
- synchronized (this) {
- this.stopIndex = raftLog.getLastCommittedIndex();
- notifyUpdater();
- }
+ if (stopIndex.compareAndSet(null, raftLog.getLastCommittedIndex())) {
+ notifyUpdater();
LOG.info("{}: set stopIndex = {}", this, stopIndex);
}
updater.join();
@@ -122,76 +125,35 @@ class StateMachineUpdater implements Runnable {
@Override
public String toString() {
- return this.getClass().getSimpleName() + "-" + raftLog.getSelfId() + "-" +
server.getGroupId();
+ return name;
}
@Override
public void run() {
- while (isRunning()) {
+ for(; state != State.STOP; ) {
try {
- synchronized (this) {
- // when the peers just start, the committedIndex is initialized as 0
- // and will be updated only after the leader contacts other peers.
- // Thus initially lastAppliedIndex can be greater than lastCommitted.
- while (lastAppliedIndex >= raftLog.getLastCommittedIndex()
- && !shouldStop()) {
- wait();
- }
- }
-
- final long committedIndex = raftLog.getLastCommittedIndex();
- Preconditions.assertTrue(lastAppliedIndex <= committedIndex);
+ waitForCommit();
if (state == State.RELOAD) {
- Preconditions.assertTrue(stateMachine.getLifeCycleState() ==
LifeCycle.State.PAUSED);
-
- stateMachine.reinitialize();
-
- SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
- Preconditions.assertTrue(snapshot != null && snapshot.getIndex() >
lastAppliedIndex,
- "Snapshot: %s, lastAppliedIndex: %s", snapshot,
lastAppliedIndex);
-
- lastAppliedIndex = snapshot.getIndex();
- lastSnapshotIndex = snapshot.getIndex();
- state = State.RUNNING;
+ reload();
}
- final MemoizedSupplier<List<CompletableFuture<Message>>> futures
- = MemoizedSupplier.valueOf(() -> new ArrayList<>());
- while (lastAppliedIndex < committedIndex) {
- final long nextIndex = lastAppliedIndex + 1;
- final LogEntryProto next = raftLog.get(nextIndex);
- if (next != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: applying nextIndex={}, nextLog={}",
- this, nextIndex, ServerProtoUtils.toString(next));
- }
- final CompletableFuture<Message> f =
server.applyLogToStateMachine(next);
- if (f != null) {
- futures.get().add(f);
- }
- lastAppliedIndex = nextIndex;
- } else {
- LOG.debug("{}: logEntry {} is null. There may be snapshot to load.
state:{}",
- this, nextIndex, state);
- break;
- }
- }
+ final MemoizedSupplier<List<CompletableFuture<Message>>> futures =
applyLog();
// check if need to trigger a snapshot
if (shouldTakeSnapshot()) {
if (futures.isInitialized()) {
JavaUtils.allOf(futures.get()).get();
}
- lastSnapshotIndex = stateMachine.takeSnapshot();
- raftLog.purge(lastSnapshotIndex);
+
+ takeSnapshot();
}
if (shouldStop()) {
stop();
}
} catch (InterruptedException e) {
- if (!isRunning()) {
+ if (state == State.STOP) {
LOG.info("{}: the StateMachineUpdater is interrupted and will
exit.", this);
} else {
final String s = this + ": the StateMachineUpdater is wrongly
interrupted";
@@ -204,24 +166,87 @@ class StateMachineUpdater implements Runnable {
}
}
- private boolean isRunning() {
- return state != State.STOP;
+ private synchronized void waitForCommit() throws InterruptedException {
+ // When a peer starts, the committed is initialized to 0.
+ // It will be updated only after the leader contacts other peers.
+ // Thus it is possible to have applied > committed initially.
+ final long applied = getLastAppliedIndex();
+ for(; applied >= raftLog.getLastCommittedIndex() && state == State.RUNNING
&& !shouldStop(); ) {
+ wait();
+ }
+ }
+
+ private void reload() throws IOException {
+ Preconditions.assertTrue(stateMachine.getLifeCycleState() ==
LifeCycle.State.PAUSED);
+
+ stateMachine.reinitialize();
+
+ final SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
+ Objects.requireNonNull(snapshot, "snapshot == null");
+ final long i = snapshot.getIndex();
+ snapshotIndex.setUnconditionally(i, infoIndexChange);
+ appliedIndex.setUnconditionally(i, infoIndexChange);
+ state = State.RUNNING;
+ }
+
+ private MemoizedSupplier<List<CompletableFuture<Message>>> applyLog() throws
RaftLogIOException {
+ final MemoizedSupplier<List<CompletableFuture<Message>>> futures =
MemoizedSupplier.valueOf(ArrayList::new);
+ final long committed = raftLog.getLastCommittedIndex();
+ for(long applied; (applied = getLastAppliedIndex()) < committed && state
== State.RUNNING && !shouldStop(); ) {
+ final long nextIndex = applied + 1;
+ final LogEntryProto next = raftLog.get(nextIndex);
+ if (next != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{}: applying nextIndex={}, nextLog={}", this, nextIndex,
ServerProtoUtils.toString(next));
+ } else {
+ LOG.debug("{}: applying nextIndex={}", this, nextIndex);
+ }
+
+ final CompletableFuture<Message> f =
server.applyLogToStateMachine(next);
+ if (f != null) {
+ futures.get().add(f);
+ }
+ final long incremented =
appliedIndex.incrementAndGet(debugIndexChange);
+ Preconditions.assertTrue(incremented == nextIndex);
+ } else {
+ LOG.debug("{}: logEntry {} is null. There may be snapshot to load.
state:{}",
+ this, nextIndex, state);
+ break;
+ }
+ }
+ return futures;
+ }
+
+ private void takeSnapshot() {
+ final long i;
+ try {
+ i = stateMachine.takeSnapshot();
+ } catch (IOException e) {
+ LOG.error(name + ": Failed to take snapshot", e);
+ return;
+ }
+
+ if (i >= 0) {
+ LOG.info("{}: Took a snapshot at index {}", name, i);
+ snapshotIndex.updateIncreasingly(i, infoIndexChange);
+ raftLog.purge(i);
+ }
}
private boolean shouldStop() {
- return stopIndex != null && getLastAppliedIndex() >= stopIndex;
+ return Optional.ofNullable(stopIndex.get()).filter(i -> i <=
getLastAppliedIndex()).isPresent();
}
private boolean shouldTakeSnapshot() {
- return autoSnapshotEnabled &&
- ( ((state != State.RELOAD)
- && (getLastAppliedIndex() - lastSnapshotIndex
- >= autoSnapshotThreshold))
- || shouldStop()
- );
+ if (autoSnapshotThreshold == null) {
+ return false;
+ } else if (shouldStop()) {
+ return true;
+ }
+ return state == State.RUNNING && getLastAppliedIndex() -
snapshotIndex.get() >= autoSnapshotThreshold;
}
long getLastAppliedIndex() {
- return lastAppliedIndex;
+ return appliedIndex.get();
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index b7d2d5c..bc35014 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -319,6 +319,7 @@ public abstract class RaftLog implements
RaftLogSequentialOps, Closeable {
* @param index - is inclusive.
*/
public final CompletableFuture<Long> purge(long index) {
+ LOG.info("{}: purge {}", getName(), index);
return purgeImpl(index);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
index 1c42d84..1241c5b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
@@ -71,6 +71,13 @@ public class RaftLogIndex {
return updated;
}
+ public long incrementAndGet(Consumer<Object> log) {
+ final long newIndex = index.incrementAndGet();
+ log.accept(StringUtils.stringSupplierAsObject(
+ () -> name + ": incrementAndGet " + (newIndex-1) + " -> " + newIndex));
+ return newIndex;
+ }
+
@Override
public String toString() {
return name + ":" + index;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 3626755..bbe2e1c 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -197,12 +197,6 @@ public class MemoryRaftLog extends RaftLog {
}
}
- @Override
- public String toString() {
- return "last=" + getLastEntryTermIndex() + ", committed="
- + ServerProtoUtils.toString(get(getLastCommittedIndex()));
- }
-
public String getEntryString() {
return "entries=" + entries;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index 35fcaaf..81a9de2 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -22,8 +22,6 @@ import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
-import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +40,6 @@ public class RaftStorage implements Closeable {
private final RaftStorageDirectory storageDir;
private final StorageState state;
private volatile MetaFile metaFile;
- private StateMachineStorage stateMachineStorage;
public RaftStorage(File dir, RaftServerConstants.StartupOption option)
throws IOException {
@@ -140,23 +137,8 @@ public class RaftStorage implements Closeable {
}
}
- public SnapshotInfo getLatestSnapshot() throws IOException {
- return getStateMachineStorage().getLatestSnapshot();
- }
-
- /**
- * Called by the state machine after it has initialized the
StateMachineStorage.
- */
- public void setStateMachineStorage(StateMachineStorage smStorage) {
- this.stateMachineStorage = smStorage;
- }
-
- public StateMachineStorage getStateMachineStorage() {
- return stateMachineStorage;
- }
-
@Override
public String toString() {
- return getStorageDir() + "";
+ return getClass().getSimpleName() + ":" + getStorageDir();
}
}