Repository: incubator-ratis Updated Branches: refs/heads/master 4d723a2c7 -> 1d2ebee02
RATIS-379. Allow writing state machine data to be sync'ed with writing raft log. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1d2ebee0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1d2ebee0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1d2ebee0 Branch: refs/heads/master Commit: 1d2ebee02d9e97763936ebd9b8b7cbfded514a1e Parents: 4d723a2 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Tue Oct 30 13:11:17 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Tue Oct 30 13:11:17 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/conf/ConfUtils.java | 4 +- .../ratis/protocol/TimeoutIOException.java | 31 ++++++++ .../java/org/apache/ratis/util/IOUtils.java | 22 ++++- .../ratis/server/RaftServerConfigKeys.java | 24 ++++++ .../ratis/server/impl/RaftServerProxy.java | 4 +- .../ratis/server/storage/RaftLogWorker.java | 46 ++++++----- .../ratis/server/storage/SegmentedRaftLog.java | 11 ++- .../java/org/apache/ratis/RaftTestUtil.java | 11 ++- .../server/storage/TestSegmentedRaftLog.java | 84 ++++++++++++++++++-- .../SimpleStateMachine4Testing.java | 2 + 10 files changed, 205 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java index 453ea0e..3ffd8be 100644 --- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java @@ -254,8 +254,8 @@ public interface ConfUtils { } if (printKey(confClass, out, f, "KEY", "DEFAULT", (b, defaultField) -> - b.append(defaultField.getType().getSimpleName()).append(", ") - .append("default=" + defaultField.get(null)))) { + b.append(defaultField.getGenericType().getTypeName()).append(", ") + .append("default=").append(defaultField.get(null)))) { return; } if (printKey(confClass, out, f, "PARAMETER", "CLASS", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java new file mode 100644 index 0000000..6effb30 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.io.IOException; + +/** + * Timeout has occurred for a blocking I/O. + */ +public class TimeoutIOException extends IOException { + static final long serialVersionUID = 1L; + + public TimeoutIOException(String message, Throwable throwable) { + super(message, throwable); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java index 8559239..dbb8d20 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java @@ -20,6 +20,7 @@ package org.apache.ratis.util; +import org.apache.ratis.protocol.TimeoutIOException; import org.slf4j.Logger; import java.io.Closeable; @@ -35,6 +36,8 @@ import java.nio.channels.FileChannel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; /** * IO related utility methods. @@ -58,11 +61,11 @@ public interface IOUtils { return cause != null? asIOException(cause): new IOException(e); } - static <T> T getFromFuture(CompletableFuture<T> future, Object name) throws IOException { + static <T> T getFromFuture(CompletableFuture<T> future, Supplier<Object> name) throws IOException { try { return future.get(); } catch (InterruptedException e) { - throw toInterruptedIOException(name + " interrupted.", e); + throw toInterruptedIOException(name.get() + " interrupted.", e); } catch (ExecutionException e) { throw toIOException(e); } catch (CompletionException e) { @@ -70,6 +73,21 @@ public interface IOUtils { } } + static <T> T getFromFuture(CompletableFuture<T> future, Supplier<Object> name, TimeDuration timeout) + throws IOException { + try { + return future.get(timeout.getDuration(), timeout.getUnit()); + } catch (InterruptedException e) { + throw toInterruptedIOException(name.get() + " interrupted.", e); + } catch (ExecutionException e) { + throw toIOException(e); + } catch (CompletionException e) { + throw asIOException(JavaUtils.unwrapCompletionException(e)); + } catch(TimeoutException e) { + throw new TimeoutIOException("Timeout: " + name.get(), e); + } + } + static boolean shouldReconnect(Exception e) { return ReflectionUtils.isInstance(e, SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 33662c9..22799f2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -144,6 +144,30 @@ public interface RaftServerConfigKeys { setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum); } + interface StateMachineData { + String PREFIX = Log.PREFIX + ".statemachine.data"; + + String SYNC_KEY = PREFIX + ".sync"; + boolean SYNC_DEFAULT = true; + static boolean sync(RaftProperties properties) { + return getBoolean(properties::getBoolean, + SYNC_KEY, SYNC_DEFAULT, getDefaultLog()); + } + static void setSync(RaftProperties properties, boolean sync) { + setBoolean(properties::setBoolean, SYNC_KEY, sync); + } + + String SYNC_TIMEOUT_KEY = PREFIX + ".sync.timeout"; + TimeDuration SYNC_TIMEOUT_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS); + static TimeDuration syncTimeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(SYNC_TIMEOUT_DEFAULT.getUnit()), + SYNC_TIMEOUT_KEY, SYNC_TIMEOUT_DEFAULT, getDefaultLog()); + } + static void setSyncTimeout(RaftProperties properties, TimeDuration syncTimeout) { + setTimeDuration(properties::setTimeDuration, SYNC_TIMEOUT_KEY, syncTimeout); + } + } + interface Appender { String PREFIX = Log.PREFIX + ".appender"; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 7648247..a4458f4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -263,13 +263,13 @@ public class RaftServerProxy implements RaftServer { public RaftServerImpl getImpl(RaftGroupId groupId) throws IOException { Objects.requireNonNull(groupId, "groupId == null"); - return IOUtils.getFromFuture(getImplFuture(groupId), getId()); + return IOUtils.getFromFuture(getImplFuture(groupId), this::getId); } List<RaftServerImpl> getImpls() throws IOException { final List<RaftServerImpl> list = new ArrayList<>(); for(CompletableFuture<RaftServerImpl> f : impls.getAll()) { - list.add(IOUtils.getFromFuture(f, getId())); + list.add(IOUtils.getFromFuture(f, this::getId)); } return list; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index b5b9ac5..6c64057 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -24,7 +24,6 @@ import org.apache.ratis.metrics.RatisMetricsRegistry; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo; import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments; @@ -37,11 +36,11 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; /** @@ -81,22 +80,26 @@ class RaftLogWorker implements Runnable { private final long preallocatedSize; private final int bufferSize; - RaftLogWorker(RaftPeerId selfId, RaftServerImpl raftServer, RaftStorage storage, - RaftProperties properties) { + private final boolean stateMachineDataSync; + private final TimeDuration stateMachineDataSyncTimeout; + + RaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, Runnable submitUpdateCommitEvent, + RaftStorage storage, RaftProperties properties) { this.name = selfId + "-" + getClass().getSimpleName(); LOG.info("new {} for {}", name, storage); - this.submitUpdateCommitEvent = raftServer != null? raftServer::submitUpdateCommitEvent: () -> {}; - this.stateMachine = raftServer != null? raftServer.getStateMachine(): null; + this.submitUpdateCommitEvent = submitUpdateCommitEvent; + this.stateMachine = stateMachine; this.storage = storage; - this.segmentMaxSize = - RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - this.preallocatedSize = - RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); - this.bufferSize = - RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); + this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); + this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); + this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties); + + this.stateMachineDataSync = RaftServerConfigKeys.Log.StateMachineData.sync(properties); + this.stateMachineDataSyncTimeout = RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties); + this.workerThread = new Thread(this, name); // Server Id can be null in unit tests @@ -220,16 +223,19 @@ class RaftLogWorker implements Runnable { private void flushWrites() throws IOException { if (out != null) { - LOG.debug("flush data to " + out + ", reset pending_sync_number to 0"); + LOG.debug("{}: flush {}", name, out); final Timer.Context timerContext = logFlushTimer.get().time(); try { final CompletableFuture<Void> f = stateMachine != null ? stateMachine.flushStateMachineData(lastWrittenIndex) : CompletableFuture.completedFuture(null); + if (stateMachineDataSync) { + IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData", stateMachineDataSyncTimeout); + } out.flush(); - f.get(); - } catch (InterruptedException | ExecutionException e) { - throw IOUtils.asIOException(e); + if (!stateMachineDataSync) { + IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData"); + } } finally { timerContext.stop(); } @@ -238,10 +244,10 @@ class RaftLogWorker implements Runnable { } private void updateFlushedIndex() { - LOG.debug("{}: updateFlushedIndex {} -> {}", name, lastWrittenIndex, flushedIndex); + LOG.debug("{}: updateFlushedIndex {} -> {}", name, flushedIndex, lastWrittenIndex); flushedIndex = lastWrittenIndex; pendingFlushNum = 0; - submitUpdateCommitEvent.run(); + Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run); } /** @@ -300,6 +306,10 @@ class RaftLogWorker implements Runnable { @Override public void execute() throws IOException { + if (stateMachineDataSync && stateMachineFuture != null) { + IOUtils.getFromFuture(stateMachineFuture, () -> this + "-writeStateMachineData", stateMachineDataSyncTimeout); + } + Preconditions.assertTrue(out != null); Preconditions.assertTrue(lastWrittenIndex + 1 == entry.getIndex(), "lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index f2b5c1a..666b9c8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -27,6 +27,7 @@ import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; @@ -101,13 +102,21 @@ public class SegmentedRaftLog extends RaftLog { public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server, RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) { + this(selfId, server, server != null? server.getStateMachine(): null, + server != null? server::submitUpdateCommitEvent: null, + storage, lastIndexInSnapshot, properties); + } + + SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server, + StateMachine stateMachine, Runnable submitUpdateCommitEvent, + RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) { super(selfId, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties) .getSizeInt()); this.server = server; this.storage = storage; segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); cache = new RaftLogCache(selfId, storage, properties); - fileLogWorker = new RaftLogWorker(selfId, server, storage, properties); + this.fileLogWorker = new RaftLogWorker(selfId, stateMachine, submitUpdateCommitEvent, storage, properties); lastCommitted.set(lastIndexInSnapshot); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 5946a47..78e3768 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -280,13 +280,18 @@ public interface RaftTestUtil { private final StateMachineLogEntryProto smLogEntryProto; public SimpleOperation(String op) { - this(clientId, callId.incrementAndGet(), op); + this(op, false); } - private SimpleOperation(ClientId clientId, long callId, String op) { + public SimpleOperation(String op, boolean hasStateMachineData) { + this(clientId, callId.incrementAndGet(), op, hasStateMachineData); + } + + private SimpleOperation(ClientId clientId, long callId, String op, boolean hasStateMachineData) { this.op = Objects.requireNonNull(op); + final ByteString bytes = ProtoUtils.toByteString(op); this.smLogEntryProto = ServerProtoUtils.toStateMachineLogEntryProto( - clientId, callId, ProtoUtils.toByteString(op), null); + clientId, callId, bytes, hasStateMachineData? bytes: null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java index 5cb498a..8083b62 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java @@ -30,8 +30,11 @@ import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.SizeInBytes; import org.junit.After; import org.junit.Assert; @@ -46,6 +49,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import static org.mockito.Matchers.any; @@ -175,12 +179,18 @@ public class TestSegmentedRaftLog extends BaseTest { Supplier<String> stringSupplier) { List<LogEntryProto> eList = new ArrayList<>(); for (SegmentRange range : slist) { - for (long index = range.start; index <= range.end; index++) { - SimpleOperation m = stringSupplier == null ? - new SimpleOperation("m" + index) : - new SimpleOperation(stringSupplier.get()); - eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index)); - } + prepareLogEntries(range, stringSupplier, false, eList); + } + return eList; + } + + List<LogEntryProto> prepareLogEntries(SegmentRange range, + Supplier<String> stringSupplier, boolean hasStataMachineData, List<LogEntryProto> eList) { + for(long index = range.start; index <= range.end; index++) { + SimpleOperation m = stringSupplier == null? + new SimpleOperation("m" + index, hasStataMachineData): + new SimpleOperation(stringSupplier.get(), hasStataMachineData); + eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index)); } return eList; } @@ -400,6 +410,60 @@ public class TestSegmentedRaftLog extends BaseTest { } } + @Test + public void testSegmentedRaftLogStateMachineData() throws Exception { + final SegmentRange range = new SegmentRange(0, 10, 1, true); + final List<LogEntryProto> entries = prepareLogEntries(range, null, true, new ArrayList<>()); + + final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing(); + try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, null, storage, -1, properties)) { + raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null); + + int next = 0; + long flush = -1; + assertIndices(raftLog, flush, next); + raftLog.appendEntry(entries.get(next++)); + assertIndices(raftLog, flush, next); + raftLog.appendEntry(entries.get(next++)); + assertIndices(raftLog, flush, next); + raftLog.appendEntry(entries.get(next++)); + assertIndicesMultipleAttempts(raftLog, flush += 3, next); + + sm.blockFlushStateMachineData(); + raftLog.appendEntry(entries.get(next++)); + { + sm.blockWriteStateMachineData(); + final Thread t = startAppendEntryThread(raftLog, entries.get(next++)); + TimeUnit.SECONDS.sleep(1); + Assert.assertTrue(t.isAlive()); + sm.unblockWriteStateMachineData(); + t.join(); + } + assertIndices(raftLog, flush, next); + TimeUnit.SECONDS.sleep(1); + assertIndices(raftLog, flush, next); + sm.unblockFlushStateMachineData(); + assertIndicesMultipleAttempts(raftLog, flush + 2, next); + } + } + + static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) { + final Thread t = new Thread(() -> raftLog.appendEntry(entry)); + t.start(); + return t; + } + + void assertIndices(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) { + LOG.info("assert expectedFlushIndex={}", expectedFlushIndex); + Assert.assertEquals(expectedFlushIndex, raftLog.getLatestFlushedIndex()); + LOG.info("assert expectedNextIndex={}", expectedNextIndex); + Assert.assertEquals(expectedNextIndex, raftLog.getNextIndex()); + } + + void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, long expectedNextIndex) throws Exception { + JavaUtils.attempt(() -> assertIndices(raftLog, expectedFlushIndex, expectedNextIndex), + 10, 100, "assertIndices", LOG); + } @Test public void testSegmentedRaftLogFormatInternalHeader() throws Exception { @@ -410,5 +474,13 @@ public class TestSegmentedRaftLog extends BaseTest { LOG.info("header' = " + new String(header, StandardCharsets.UTF_8)); return null; }), IllegalStateException.class); + + // reset the header + SegmentedRaftLogFormat.applyHeaderTo(header -> { + LOG.info("header' = " + new String(header, StandardCharsets.UTF_8)); + header[0] -= 1; // try changing the internal header + LOG.info("header'' = " + new String(header, StandardCharsets.UTF_8)); + return null; + }); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 2c6883e..84be87b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -100,12 +100,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { private final EnumMap<Type, CompletableFuture<Void>> maps = new EnumMap<>(Type.class); void block(Type type) { + LOG.info("block {}", type); final CompletableFuture<Void> future = new CompletableFuture<>(); final CompletableFuture<Void> previous = maps.putIfAbsent(type, future); Preconditions.assertNull(previous, "previous"); } void unblock(Type type) { + LOG.info("unblock {}", type); final CompletableFuture<Void> future = maps.remove(type); Objects.requireNonNull(future, "future == null"); future.complete(null);
