Repository: incubator-ratis
Updated Branches:
  refs/heads/master 6b63986de -> a1edeabea


RATIS-439. TestSegmentedRaftLog is failing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a1edeabe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a1edeabe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a1edeabe

Branch: refs/heads/master
Commit: a1edeabea016ff7c55d4459fdb96c70a0ff132c7
Parents: 6b63986
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Fri Nov 30 14:02:44 2018 -0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Fri Nov 30 14:02:44 2018 -0800

----------------------------------------------------------------------
 .../apache/ratis/util/AutoCloseableLock.java    | 20 ++++--
 .../ratis/util/AutoCloseableReadWriteLock.java  | 68 ++++++++++++++++++++
 .../ratis/server/storage/RaftLogCache.java      | 23 +++++--
 .../ratis/server/storage/SegmentedRaftLog.java  | 29 +++++----
 .../ratis/server/storage/TestCacheEviction.java |  4 +-
 .../server/storage/TestSegmentedRaftLog.java    | 11 +++-
 6 files changed, 127 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java 
b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
index 489b5cd..05efafc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -36,22 +36,34 @@ public class AutoCloseableLock implements AutoCloseable {
    * }}</pre>
    */
   public static AutoCloseableLock acquire(final Lock lock) {
+    return acquire(lock, null);
+  }
+
+  public static AutoCloseableLock acquire(final Lock lock, Runnable preUnlock) 
{
     lock.lock();
-    return new AutoCloseableLock(lock);
+    return new AutoCloseableLock(lock, preUnlock);
   }
 
   private final Lock underlying;
   private final AtomicBoolean closed = new AtomicBoolean(false);
+  private final Runnable preUnlock;
 
-  private AutoCloseableLock(Lock underlying) {
+  private AutoCloseableLock(Lock underlying, Runnable preUnlock) {
     this.underlying = underlying;
+    this.preUnlock = preUnlock;
   }
 
   /** Unlock the underlying lock.  This method is idempotent. */
   @Override
   public void close() {
     if (closed.compareAndSet(false, true)) {
-      underlying.unlock();
+      try {
+        if (preUnlock != null) {
+          preUnlock.run();
+        }
+      } finally {
+        underlying.unlock();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java
 
b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java
new file mode 100644
index 0000000..810018b
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+public class AutoCloseableReadWriteLock {
+  private final Object name;
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+  private final AtomicInteger depth = new AtomicInteger();
+
+  public AutoCloseableReadWriteLock(Object name) {
+    this.name = name;
+  }
+
+  public AutoCloseableLock readLock(StackTraceElement caller, Consumer<String> 
log) {
+    final AutoCloseableLock readLock = 
AutoCloseableLock.acquire(lock.readLock(),
+        () -> logLocking(name, caller, true, false, log));
+
+    logLocking(name, caller, true, true, log);
+    return readLock;
+  }
+
+  public AutoCloseableLock writeLock(StackTraceElement caller, 
Consumer<String> log) {
+    final AutoCloseableLock writeLock = 
AutoCloseableLock.acquire(lock.writeLock(),
+        () -> logLocking(name, caller, false, false, log));
+
+    logLocking(name, caller, false, true, log);
+    return writeLock;
+  }
+
+  private void logLocking(Object name, StackTraceElement caller, boolean read, 
boolean acquire, Consumer<String> log) {
+    if (caller != null && log != null) {
+      final int d = acquire? depth.getAndIncrement(): depth.decrementAndGet();
+      final StringBuilder b = new StringBuilder();
+      for(int i = 0; i < d; i++) {
+        b.append("  ");
+      }
+      if (name != null) {
+        b.append(name).append(": ");
+      }
+      b.append(read? "readLock ": "writeLock ")
+          .append(acquire ? "ACQUIRED ": "RELEASED ")
+          .append(depth).append(" by ");
+      final String className = caller.getClassName();
+      final int i = className.lastIndexOf('.');
+      b.append(i >= 0? className.substring(i + 1): 
className).append(".").append(caller.getMethodName());
+      log.accept(b.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index f0cab43..1ea11e4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,13 +28,14 @@ import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.AutoCloseableReadWriteLock;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 
 import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
@@ -90,15 +91,23 @@ class RaftLogCache {
   }
 
   static class LogSegmentList {
+    private final Object name;
     private final List<LogSegment> segments = new ArrayList<>();
-    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+    private final AutoCloseableReadWriteLock lock;
+
+    LogSegmentList(Object name) {
+      this.name = name;
+      this.lock = new AutoCloseableReadWriteLock(name);
+    }
 
     AutoCloseableLock readLock() {
-      return AutoCloseableLock.acquire(lock.readLock());
+      final StackTraceElement caller = LOG.isTraceEnabled()? 
JavaUtils.getCallerStackTraceElement(): null;
+      return lock.readLock(caller, LOG::trace);
     }
 
     AutoCloseableLock writeLock() {
-      return AutoCloseableLock.acquire(lock.writeLock());
+      final StackTraceElement caller = LOG.isTraceEnabled()? 
JavaUtils.getCallerStackTraceElement(): null;
+      return lock.writeLock(caller, LOG::trace);
     }
 
     boolean isEmpty() {
@@ -241,7 +250,7 @@ class RaftLogCache {
 
   private final String name;
   private volatile LogSegment openSegment;
-  private final LogSegmentList closedSegments = new LogSegmentList();
+  private final LogSegmentList closedSegments;
   private final RaftStorage storage;
 
   private final int maxCachedSegments;
@@ -249,6 +258,7 @@ class RaftLogCache {
 
   RaftLogCache(RaftPeerId selfId, RaftStorage storage, RaftProperties 
properties) {
     this.name = selfId + "-" + getClass().getSimpleName();
+    this.closedSegments = new LogSegmentList(name);
     this.storage = storage;
     maxCachedSegments = 
RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
   }
@@ -471,6 +481,7 @@ class RaftLogCache {
           for(; i.hasNext(); ) {
             failClientRequest.accept(i.next());
           }
+          break;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index b692957..d23e0a5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -38,6 +38,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
@@ -93,7 +94,7 @@ public class SegmentedRaftLog extends RaftLog {
     }
   }
 
-  private final RaftServerImpl server;
+  private final Optional<RaftServerImpl> server;
   private final RaftStorage storage;
   private final RaftLogCache cache;
   private final RaftLogWorker fileLogWorker;
@@ -111,7 +112,7 @@ public class SegmentedRaftLog extends RaftLog {
       StateMachine stateMachine, Runnable submitUpdateCommitEvent,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties 
properties) {
     super(selfId, lastIndexInSnapshot, 
RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt());
-    this.server = server;
+    this.server = Optional.ofNullable(server);
     this.storage = storage;
     segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     cache = new RaftLogCache(selfId, storage, properties);
@@ -200,9 +201,9 @@ public class SegmentedRaftLog extends RaftLog {
     }
 
     try {
-      return new EntryWithData(entry, 
server.getStateMachine().readStateMachineData(entry));
+      return new EntryWithData(entry, server.map(s -> 
s.getStateMachine().readStateMachineData(entry)).orElse(null));
     } catch (Throwable e) {
-      final String err = server.getId() + ": Failed readStateMachineData for " 
+
+      final String err = getSelfId() + ": Failed readStateMachineData for " +
           ServerProtoUtils.toLogEntryString(entry);
       LOG.error(err, e);
       throw new RaftLogIOException(err, 
JavaUtils.unwrapCompletionException(e));
@@ -210,13 +211,12 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   private void checkAndEvictCache() {
-    if (server != null && cache.shouldEvict()) {
+    if (server.isPresent() && cache.shouldEvict()) {
       // TODO if the cache is hitting the maximum size and we cannot evict any
       // segment's cache, should block the new entry appending or new segment
       // allocation.
-      cache.evictCache(server.getFollowerNextIndices(),
-          fileLogWorker.getFlushedIndex(),
-          server.getState().getLastAppliedIndex());
+      final RaftServerImpl s = server.get();
+      cache.evictCache(s.getFollowerNextIndices(), 
fileLogWorker.getFlushedIndex(), s.getState().getLastAppliedIndex());
     }
   }
 
@@ -266,7 +266,7 @@ public class SegmentedRaftLog extends RaftLog {
   CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
     checkLogState();
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}: appendEntry {}", server.getId(),
+      LOG.trace("{}: appendEntry {}", getSelfId(),
           ServerProtoUtils.toLogEntryString(entry));
     }
     try(AutoCloseableLock writeLock = writeLock()) {
@@ -304,8 +304,7 @@ public class SegmentedRaftLog extends RaftLog {
       }
       return writeFuture;
     } catch (Throwable throwable) {
-      LOG.error(getSelfId() + "exception while appending entry with index:" +
-          entry.getIndex(), throwable);
+      LOG.error(getSelfId() + ": Failed to append " + 
ServerProtoUtils.toLogEntryString(entry), throwable);
       throw throwable;
     }
   }
@@ -323,9 +322,12 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   private void failClientRequest(TermIndex ti) {
+    if (!server.isPresent()) {
+      return;
+    }
     try {
       final LogEntryProto entry = get(ti.getIndex());
-      server.failClientRequest(entry);
+      server.get().failClientRequest(entry);
     } catch(RaftLogIOException e) {
       LOG.error(getName() + ": Failed to read log " + ti, e);
     }
@@ -341,6 +343,7 @@ public class SegmentedRaftLog extends RaftLog {
       final TruncateIndices ti = 
cache.computeTruncateIndices(this::failClientRequest, entries);
       final long truncateIndex = ti.getTruncateIndex();
       final int index = ti.getArrayIndex();
+      LOG.debug("truncateIndex={}, arrayIndex={}", truncateIndex, index);
 
       final List<CompletableFuture<Long>> futures;
       if (truncateIndex != -1) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
index b34d58f..d84de42 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -49,7 +49,7 @@ public class TestCacheEviction extends BaseTest {
 
   static LogSegmentList prepareSegments(int numSegments, boolean[] cached, 
long start, long size) {
     Assert.assertEquals(numSegments, cached.length);
-    final LogSegmentList segments = new LogSegmentList();
+    final LogSegmentList segments = new 
LogSegmentList(TestCacheEviction.class.getSimpleName());
     for (int i = 0; i < numSegments; i++) {
       LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1);
       if (cached[i]) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index bcbfa73..6e516bc 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -65,6 +65,8 @@ import static org.mockito.Mockito.when;
 public class TestSegmentedRaftLog extends BaseTest {
   static {
     LogUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftLogCache.LOG, Level.TRACE);
+    LogUtils.setLogLevel(SegmentedRaftLog.LOG, Level.TRACE);
   }
 
   private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
@@ -376,7 +378,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
-      entries.stream().forEach(entry -> 
RetryCacheTestUtil.createEntry(retryCache, entry));
+      entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, 
entry));
       // append entries to the raftlog
       
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
     }
@@ -392,7 +394,10 @@ public class TestSegmentedRaftLog extends BaseTest {
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
-      raftLog.append(newEntries.toArray(new 
LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join);
+      LOG.info("newEntries[0] = {}", newEntries.get(0));
+      final int last = newEntries.size() - 1;
+      LOG.info("newEntries[{}] = {}", last, newEntries.get(last));
+      raftLog.append(newEntries.toArray(new 
LogEntryProto[0])).forEach(CompletableFuture::join);
 
       checkFailedEntries(entries, 650, retryCache);
       checkEntries(raftLog, entries, 0, 650);

Reply via email to