Repository: incubator-ratis Updated Branches: refs/heads/master 6b63986de -> a1edeabea
RATIS-439. TestSegmentedRaftLog is failing. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a1edeabe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a1edeabe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a1edeabe Branch: refs/heads/master Commit: a1edeabea016ff7c55d4459fdb96c70a0ff132c7 Parents: 6b63986 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Nov 30 14:02:44 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Nov 30 14:02:44 2018 -0800 ---------------------------------------------------------------------- .../apache/ratis/util/AutoCloseableLock.java | 20 ++++-- .../ratis/util/AutoCloseableReadWriteLock.java | 68 ++++++++++++++++++++ .../ratis/server/storage/RaftLogCache.java | 23 +++++-- .../ratis/server/storage/SegmentedRaftLog.java | 29 +++++---- .../ratis/server/storage/TestCacheEviction.java | 4 +- .../server/storage/TestSegmentedRaftLog.java | 11 +++- 6 files changed, 127 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java index 489b5cd..05efafc 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,22 +36,34 @@ public class AutoCloseableLock implements AutoCloseable { * }}</pre> */ public static AutoCloseableLock acquire(final Lock lock) { + return acquire(lock, null); + } + + public static AutoCloseableLock acquire(final Lock lock, Runnable preUnlock) { lock.lock(); - return new AutoCloseableLock(lock); + return new AutoCloseableLock(lock, preUnlock); } private final Lock underlying; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Runnable preUnlock; - private AutoCloseableLock(Lock underlying) { + private AutoCloseableLock(Lock underlying, Runnable preUnlock) { this.underlying = underlying; + this.preUnlock = preUnlock; } /** Unlock the underlying lock. This method is idempotent. */ @Override public void close() { if (closed.compareAndSet(false, true)) { - underlying.unlock(); + try { + if (preUnlock != null) { + preUnlock.run(); + } + } finally { + underlying.unlock(); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java new file mode 100644 index 0000000..810018b --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableReadWriteLock.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +public class AutoCloseableReadWriteLock { + private final Object name; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final AtomicInteger depth = new AtomicInteger(); + + public AutoCloseableReadWriteLock(Object name) { + this.name = name; + } + + public AutoCloseableLock readLock(StackTraceElement caller, Consumer<String> log) { + final AutoCloseableLock readLock = AutoCloseableLock.acquire(lock.readLock(), + () -> logLocking(name, caller, true, false, log)); + + logLocking(name, caller, true, true, log); + return readLock; + } + + public AutoCloseableLock writeLock(StackTraceElement caller, Consumer<String> log) { + final AutoCloseableLock writeLock = AutoCloseableLock.acquire(lock.writeLock(), + () -> logLocking(name, caller, false, false, log)); + + logLocking(name, caller, false, true, log); + return writeLock; + } + + private void logLocking(Object name, StackTraceElement caller, boolean read, boolean acquire, Consumer<String> log) { + if (caller != null && log != null) { + final int d = acquire? depth.getAndIncrement(): depth.decrementAndGet(); + final StringBuilder b = new StringBuilder(); + for(int i = 0; i < d; i++) { + b.append(" "); + } + if (name != null) { + b.append(name).append(": "); + } + b.append(read? "readLock ": "writeLock ") + .append(acquire ? "ACQUIRED ": "RELEASED ") + .append(depth).append(" by "); + final String className = caller.getClassName(); + final int i = className.lastIndexOf('.'); + b.append(i >= 0? className.substring(i + 1): className).append(".").append(caller.getMethodName()); + log.accept(b.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index f0cab43..1ea11e4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,13 +28,14 @@ import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.AutoCloseableReadWriteLock; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; @@ -90,15 +91,23 @@ class RaftLogCache { } static class LogSegmentList { + private final Object name; private final List<LogSegment> segments = new ArrayList<>(); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private final AutoCloseableReadWriteLock lock; + + LogSegmentList(Object name) { + this.name = name; + this.lock = new AutoCloseableReadWriteLock(name); + } AutoCloseableLock readLock() { - return AutoCloseableLock.acquire(lock.readLock()); + final StackTraceElement caller = LOG.isTraceEnabled()? JavaUtils.getCallerStackTraceElement(): null; + return lock.readLock(caller, LOG::trace); } AutoCloseableLock writeLock() { - return AutoCloseableLock.acquire(lock.writeLock()); + final StackTraceElement caller = LOG.isTraceEnabled()? JavaUtils.getCallerStackTraceElement(): null; + return lock.writeLock(caller, LOG::trace); } boolean isEmpty() { @@ -241,7 +250,7 @@ class RaftLogCache { private final String name; private volatile LogSegment openSegment; - private final LogSegmentList closedSegments = new LogSegmentList(); + private final LogSegmentList closedSegments; private final RaftStorage storage; private final int maxCachedSegments; @@ -249,6 +258,7 @@ class RaftLogCache { RaftLogCache(RaftPeerId selfId, RaftStorage storage, RaftProperties properties) { this.name = selfId + "-" + getClass().getSimpleName(); + this.closedSegments = new LogSegmentList(name); this.storage = storage; maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties); } @@ -471,6 +481,7 @@ class RaftLogCache { for(; i.hasNext(); ) { failClientRequest.accept(i.next()); } + break; } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index b692957..d23e0a5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -93,7 +94,7 @@ public class SegmentedRaftLog extends RaftLog { } } - private final RaftServerImpl server; + private final Optional<RaftServerImpl> server; private final RaftStorage storage; private final RaftLogCache cache; private final RaftLogWorker fileLogWorker; @@ -111,7 +112,7 @@ public class SegmentedRaftLog extends RaftLog { StateMachine stateMachine, Runnable submitUpdateCommitEvent, RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) { super(selfId, lastIndexInSnapshot, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt()); - this.server = server; + this.server = Optional.ofNullable(server); this.storage = storage; segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); cache = new RaftLogCache(selfId, storage, properties); @@ -200,9 +201,9 @@ public class SegmentedRaftLog extends RaftLog { } try { - return new EntryWithData(entry, server.getStateMachine().readStateMachineData(entry)); + return new EntryWithData(entry, server.map(s -> s.getStateMachine().readStateMachineData(entry)).orElse(null)); } catch (Throwable e) { - final String err = server.getId() + ": Failed readStateMachineData for " + + final String err = getSelfId() + ": Failed readStateMachineData for " + ServerProtoUtils.toLogEntryString(entry); LOG.error(err, e); throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e)); @@ -210,13 +211,12 @@ public class SegmentedRaftLog extends RaftLog { } private void checkAndEvictCache() { - if (server != null && cache.shouldEvict()) { + if (server.isPresent() && cache.shouldEvict()) { // TODO if the cache is hitting the maximum size and we cannot evict any // segment's cache, should block the new entry appending or new segment // allocation. - cache.evictCache(server.getFollowerNextIndices(), - fileLogWorker.getFlushedIndex(), - server.getState().getLastAppliedIndex()); + final RaftServerImpl s = server.get(); + cache.evictCache(s.getFollowerNextIndices(), fileLogWorker.getFlushedIndex(), s.getState().getLastAppliedIndex()); } } @@ -266,7 +266,7 @@ public class SegmentedRaftLog extends RaftLog { CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) { checkLogState(); if (LOG.isTraceEnabled()) { - LOG.trace("{}: appendEntry {}", server.getId(), + LOG.trace("{}: appendEntry {}", getSelfId(), ServerProtoUtils.toLogEntryString(entry)); } try(AutoCloseableLock writeLock = writeLock()) { @@ -304,8 +304,7 @@ public class SegmentedRaftLog extends RaftLog { } return writeFuture; } catch (Throwable throwable) { - LOG.error(getSelfId() + "exception while appending entry with index:" + - entry.getIndex(), throwable); + LOG.error(getSelfId() + ": Failed to append " + ServerProtoUtils.toLogEntryString(entry), throwable); throw throwable; } } @@ -323,9 +322,12 @@ public class SegmentedRaftLog extends RaftLog { } private void failClientRequest(TermIndex ti) { + if (!server.isPresent()) { + return; + } try { final LogEntryProto entry = get(ti.getIndex()); - server.failClientRequest(entry); + server.get().failClientRequest(entry); } catch(RaftLogIOException e) { LOG.error(getName() + ": Failed to read log " + ti, e); } @@ -341,6 +343,7 @@ public class SegmentedRaftLog extends RaftLog { final TruncateIndices ti = cache.computeTruncateIndices(this::failClientRequest, entries); final long truncateIndex = ti.getTruncateIndex(); final int index = ti.getArrayIndex(); + LOG.debug("truncateIndex={}, arrayIndex={}", truncateIndex, index); final List<CompletableFuture<Long>> futures; if (truncateIndex != -1) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java index b34d58f..d84de42 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -49,7 +49,7 @@ public class TestCacheEviction extends BaseTest { static LogSegmentList prepareSegments(int numSegments, boolean[] cached, long start, long size) { Assert.assertEquals(numSegments, cached.length); - final LogSegmentList segments = new LogSegmentList(); + final LogSegmentList segments = new LogSegmentList(TestCacheEviction.class.getSimpleName()); for (int i = 0; i < numSegments; i++) { LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1); if (cached[i]) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a1edeabe/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index bcbfa73..6e516bc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -65,6 +65,8 @@ import static org.mockito.Mockito.when; public class TestSegmentedRaftLog extends BaseTest { static { LogUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftLogCache.LOG, Level.TRACE); + LogUtils.setLogLevel(SegmentedRaftLog.LOG, Level.TRACE); } private static final RaftPeerId peerId = RaftPeerId.valueOf("s0"); @@ -376,7 +378,7 @@ public class TestSegmentedRaftLog extends BaseTest { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - entries.stream().forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry)); + entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry)); // append entries to the raftlog entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); } @@ -392,7 +394,10 @@ public class TestSegmentedRaftLog extends BaseTest { try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, -1, properties)) { raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); - raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join); + LOG.info("newEntries[0] = {}", newEntries.get(0)); + final int last = newEntries.size() - 1; + LOG.info("newEntries[{}] = {}", last, newEntries.get(last)); + raftLog.append(newEntries.toArray(new LogEntryProto[0])).forEach(CompletableFuture::join); checkFailedEntries(entries, 650, retryCache); checkEntries(raftLog, entries, 0, 650);
