Move TermIndexTracker to test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8dbb64fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8dbb64fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8dbb64fb Branch: refs/heads/master Commit: 8dbb64fbae8f85170b4e1494cc9b1d4fcd98b513 Parents: eaadf8e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Thu Jan 5 20:38:48 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Thu Jan 5 20:38:48 2017 +0800 ---------------------------------------------------------------------- .../arithmetic/ArithmeticStateMachine.java | 20 ++++-- .../raft/statemachine/TermIndexTracker.java | 66 -------------------- .../SimpleStateMachine4Testing.java | 11 ++-- .../raft/statemachine/TermIndexTracker.java | 66 ++++++++++++++++++++ 4 files changed, 86 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java ---------------------------------------------------------------------- diff --git a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java index b684669..91776eb 100644 --- a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java +++ b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java @@ -17,6 +17,7 @@ */ package org.apache.raft.examples.arithmetic; +import com.google.common.base.Preconditions; import org.apache.raft.conf.RaftProperties; import org.apache.raft.examples.arithmetic.expression.Expression; import org.apache.raft.protocol.Message; @@ -37,6 +38,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ArithmeticStateMachine extends BaseStateMachine { @@ -45,7 +47,7 @@ public class ArithmeticStateMachine extends BaseStateMachine { private final Map<String, Double> variables = new ConcurrentHashMap<>(); private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); - private final TermIndexTracker termIndexTracker = new TermIndexTracker(); + private final AtomicReference<TermIndex> latestTermIndex = new AtomicReference<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -59,7 +61,7 @@ public class ArithmeticStateMachine extends BaseStateMachine { void reset() { variables.clear(); - termIndexTracker.reset(); + latestTermIndex.set(null); } @Override @@ -83,7 +85,7 @@ public class ArithmeticStateMachine extends BaseStateMachine { final TermIndex last; try(final AutoCloseableLock readLock = readLock()) { copy = new HashMap<>(variables); - last = termIndexTracker.getLatestTermIndex(); + last = latestTermIndex.get(); } File snapshotFile = new File(SimpleStateMachineStorage.getSnapshotFileName( @@ -118,7 +120,7 @@ public class ArithmeticStateMachine extends BaseStateMachine { if (reload) { reset(); } - termIndexTracker.init(last); + latestTermIndex.set(last); variables.putAll((Map<String, Double>) in.readObject()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e); @@ -162,10 +164,18 @@ public class ArithmeticStateMachine extends BaseStateMachine { final Double result; try(final AutoCloseableLock writeLock = writeLock()) { result = assignment.evaluate(variables); - termIndexTracker.update(new TermIndex(entry.getTerm(), index)); + updateLatestTermIndex(entry.getTerm(), index); } final Expression r = Expression.Utils.double2Expression(result); LOG.debug("{}: {} = {}, variables={}", index, assignment, r, variables); return CompletableFuture.completedFuture(Expression.Utils.toMessage(r)); } + + private void updateLatestTermIndex(long term, long index) { + final TermIndex newTI = new TermIndex(term, index); + final TermIndex oldTI = latestTermIndex.getAndSet(newTI); + if (oldTI != null) { + Preconditions.checkArgument(newTI.compareTo(oldTI) >= 0); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java b/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java deleted file mode 100644 index 694eef4..0000000 --- a/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.statemachine; - -import com.google.common.base.Preconditions; -import org.apache.raft.server.protocol.TermIndex; - -import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; - -/** - * Tracks the term index that is applied to the StateMachine for simple state machines with - * no concurrent snapshoting capabilities. - */ -public class TermIndexTracker { - static final TermIndex INIT_TERMINDEX = - new TermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX); - - private TermIndex current = INIT_TERMINDEX; - - //TODO: developer note: everything is synchronized for now for convenience. - - /** - * Initialize the tracker with a term index (likely from a snapshot). - */ - public synchronized void init(TermIndex termIndex) { - this.current = termIndex; - } - - public synchronized void reset() { - init(INIT_TERMINDEX); - } - - /** - * Update the tracker with a new TermIndex. It means that the StateMachine has - * this index in memory. - */ - public synchronized void update(TermIndex termIndex) { - Preconditions.checkArgument(termIndex != null && - termIndex.compareTo(current) >= 0); - this.current = termIndex; - } - - /** - * Return latest term and index that is inserted to this tracker as an atomic - * entity. - */ - public synchronized TermIndex getLatestTermIndex() { - return current; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java b/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java index 3e0ae15..d11bfd4 100644 --- a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java +++ b/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java @@ -68,15 +68,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { private final List<LogEntryProto> list = Collections.synchronizedList(new ArrayList<>()); private final Daemon checkpointer; + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); + private final TermIndexTracker termIndexTracker = new TermIndexTracker(); + private final RaftProperties properties = new RaftProperties(); + private volatile boolean running = true; private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; - private SimpleStateMachineStorage storage; - private TermIndexTracker termIndexTracker; - private final RaftProperties properties = new RaftProperties(); - public SimpleStateMachine4Testing() { - this.storage = new SimpleStateMachineStorage(); - this.termIndexTracker = new TermIndexTracker(); + SimpleStateMachine4Testing() { checkpointer = new Daemon(() -> { while (running) { try { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java b/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java new file mode 100644 index 0000000..fa9c130 --- /dev/null +++ b/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.statemachine; + +import com.google.common.base.Preconditions; +import org.apache.raft.server.protocol.TermIndex; + +import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX; + +/** + * Tracks the term index that is applied to the StateMachine for simple state machines with + * no concurrent snapshoting capabilities. + */ +class TermIndexTracker { + static final TermIndex INIT_TERMINDEX = + new TermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX); + + private TermIndex current = INIT_TERMINDEX; + + //TODO: developer note: everything is synchronized for now for convenience. + + /** + * Initialize the tracker with a term index (likely from a snapshot). + */ + public synchronized void init(TermIndex termIndex) { + this.current = termIndex; + } + + public synchronized void reset() { + init(INIT_TERMINDEX); + } + + /** + * Update the tracker with a new TermIndex. It means that the StateMachine has + * this index in memory. + */ + public synchronized void update(TermIndex termIndex) { + Preconditions.checkArgument(termIndex != null && + termIndex.compareTo(current) >= 0); + this.current = termIndex; + } + + /** + * Return latest term and index that is inserted to this tracker as an atomic + * entity. + */ + public synchronized TermIndex getLatestTermIndex() { + return current; + } + +}
