Move TermIndexTracker to test.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8dbb64fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8dbb64fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8dbb64fb

Branch: refs/heads/master
Commit: 8dbb64fbae8f85170b4e1494cc9b1d4fcd98b513
Parents: eaadf8e
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Thu Jan 5 20:38:48 2017 +0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Thu Jan 5 20:38:48 2017 +0800

----------------------------------------------------------------------
 .../arithmetic/ArithmeticStateMachine.java      | 20 ++++--
 .../raft/statemachine/TermIndexTracker.java     | 66 --------------------
 .../SimpleStateMachine4Testing.java             | 11 ++--
 .../raft/statemachine/TermIndexTracker.java     | 66 ++++++++++++++++++++
 4 files changed, 86 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
----------------------------------------------------------------------
diff --git 
a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
 
b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
index b684669..91776eb 100644
--- 
a/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
+++ 
b/raft-examples/src/main/java/org/apache/raft/examples/arithmetic/ArithmeticStateMachine.java
@@ -17,6 +17,7 @@
  */
 package org.apache.raft.examples.arithmetic;
 
+import com.google.common.base.Preconditions;
 import org.apache.raft.conf.RaftProperties;
 import org.apache.raft.examples.arithmetic.expression.Expression;
 import org.apache.raft.protocol.Message;
@@ -37,6 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class ArithmeticStateMachine extends BaseStateMachine {
@@ -45,7 +47,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
   private final Map<String, Double> variables = new ConcurrentHashMap<>();
 
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
-  private final TermIndexTracker termIndexTracker = new TermIndexTracker();
+  private final AtomicReference<TermIndex> latestTermIndex = new 
AtomicReference<>();
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
@@ -59,7 +61,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
 
   void reset() {
     variables.clear();
-    termIndexTracker.reset();
+    latestTermIndex.set(null);
   }
 
   @Override
@@ -83,7 +85,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
     final TermIndex last;
     try(final AutoCloseableLock readLock = readLock()) {
       copy = new HashMap<>(variables);
-      last = termIndexTracker.getLatestTermIndex();
+      last = latestTermIndex.get();
     }
 
     File snapshotFile =  new 
File(SimpleStateMachineStorage.getSnapshotFileName(
@@ -118,7 +120,7 @@ public class ArithmeticStateMachine extends 
BaseStateMachine {
       if (reload) {
         reset();
       }
-      termIndexTracker.init(last);
+      latestTermIndex.set(last);
       variables.putAll((Map<String, Double>) in.readObject());
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -162,10 +164,18 @@ public class ArithmeticStateMachine extends 
BaseStateMachine {
     final Double result;
     try(final AutoCloseableLock writeLock = writeLock()) {
       result = assignment.evaluate(variables);
-      termIndexTracker.update(new TermIndex(entry.getTerm(), index));
+      updateLatestTermIndex(entry.getTerm(), index);
     }
     final Expression r = Expression.Utils.double2Expression(result);
     LOG.debug("{}: {} = {}, variables={}", index, assignment, r, variables);
     return CompletableFuture.completedFuture(Expression.Utils.toMessage(r));
   }
+
+  private void updateLatestTermIndex(long term, long index) {
+    final TermIndex newTI = new TermIndex(term, index);
+    final TermIndex oldTI = latestTermIndex.getAndSet(newTI);
+    if (oldTI != null) {
+      Preconditions.checkArgument(newTI.compareTo(oldTI) >= 0);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java 
b/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java
deleted file mode 100644
index 694eef4..0000000
--- 
a/raft-server/src/main/java/org/apache/raft/statemachine/TermIndexTracker.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.statemachine;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.protocol.TermIndex;
-
-import static 
org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * Tracks the term index that is applied to the StateMachine for simple state 
machines with
- * no concurrent snapshoting capabilities.
- */
-public class TermIndexTracker {
-  static final TermIndex INIT_TERMINDEX =
-      new TermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX);
-
-  private TermIndex current = INIT_TERMINDEX;
-
-  //TODO: developer note: everything is synchronized for now for convenience.
-
-  /**
-   * Initialize the tracker with a term index (likely from a snapshot).
-   */
-  public synchronized void init(TermIndex termIndex) {
-    this.current = termIndex;
-  }
-
-  public synchronized void reset() {
-    init(INIT_TERMINDEX);
-  }
-
-  /**
-   * Update the tracker with a new TermIndex. It means that the StateMachine 
has
-   * this index in memory.
-   */
-  public synchronized void update(TermIndex termIndex) {
-    Preconditions.checkArgument(termIndex != null &&
-        termIndex.compareTo(current) >= 0);
-    this.current = termIndex;
-  }
-
-  /**
-   * Return latest term and index that is inserted to this tracker as an atomic
-   * entity.
-   */
-  public synchronized TermIndex getLatestTermIndex() {
-    return current;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
 
b/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
index 3e0ae15..d11bfd4 100644
--- 
a/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
+++ 
b/raft-server/src/test/java/org/apache/raft/statemachine/SimpleStateMachine4Testing.java
@@ -68,15 +68,14 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   private final List<LogEntryProto> list =
       Collections.synchronizedList(new ArrayList<>());
   private final Daemon checkpointer;
+  private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
+  private final TermIndexTracker termIndexTracker = new TermIndexTracker();
+  private final RaftProperties properties = new RaftProperties();
+
   private volatile boolean running = true;
   private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
-  private SimpleStateMachineStorage storage;
-  private TermIndexTracker termIndexTracker;
-  private final RaftProperties properties = new RaftProperties();
 
-  public SimpleStateMachine4Testing() {
-    this.storage  = new SimpleStateMachineStorage();
-    this.termIndexTracker = new TermIndexTracker();
+  SimpleStateMachine4Testing() {
     checkpointer = new Daemon(() -> {
       while (running) {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8dbb64fb/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java
----------------------------------------------------------------------
diff --git 
a/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java 
b/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java
new file mode 100644
index 0000000..fa9c130
--- /dev/null
+++ 
b/raft-server/src/test/java/org/apache/raft/statemachine/TermIndexTracker.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.raft.statemachine;
+
+import com.google.common.base.Preconditions;
+import org.apache.raft.server.protocol.TermIndex;
+
+import static 
org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+
+/**
+ * Tracks the term index that is applied to the StateMachine for simple state 
machines with
+ * no concurrent snapshoting capabilities.
+ */
+class TermIndexTracker {
+  static final TermIndex INIT_TERMINDEX =
+      new TermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX);
+
+  private TermIndex current = INIT_TERMINDEX;
+
+  //TODO: developer note: everything is synchronized for now for convenience.
+
+  /**
+   * Initialize the tracker with a term index (likely from a snapshot).
+   */
+  public synchronized void init(TermIndex termIndex) {
+    this.current = termIndex;
+  }
+
+  public synchronized void reset() {
+    init(INIT_TERMINDEX);
+  }
+
+  /**
+   * Update the tracker with a new TermIndex. It means that the StateMachine 
has
+   * this index in memory.
+   */
+  public synchronized void update(TermIndex termIndex) {
+    Preconditions.checkArgument(termIndex != null &&
+        termIndex.compareTo(current) >= 0);
+    this.current = termIndex;
+  }
+
+  /**
+   * Return latest term and index that is inserted to this tracker as an atomic
+   * entity.
+   */
+  public synchronized TermIndex getLatestTermIndex() {
+    return current;
+  }
+
+}

Reply via email to