Repository: incubator-ratis Updated Branches: refs/heads/master 42fff2b26 -> 24f5cc769
RATIS-62. Return the Exception from StateMachine#preAppendTransaction to client as StateMachineException. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/24f5cc76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/24f5cc76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/24f5cc76 Branch: refs/heads/master Commit: 24f5cc769f6db591d9baa105579ba41725943fe8 Parents: 42fff2b Author: Jing Zhao <[email protected]> Authored: Fri Mar 31 15:16:54 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Fri Mar 31 15:16:54 2017 -0700 ---------------------------------------------------------------------- .../TestRaftStateMachineException.java | 19 ++++++------------- .../apache/ratis/server/impl/RaftServerImpl.java | 17 ++++++++--------- .../apache/ratis/server/impl/ServerState.java | 3 ++- .../org/apache/ratis/server/storage/RaftLog.java | 9 +++++++-- 4 files changed, 23 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/24f5cc76/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index d8a32f6..5ce10e2 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -72,7 +72,7 @@ public class TestRaftStateMachineException { public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException { if (failPreAppend) { - throw new IOException("Fake Exception"); + throw new IOException("Fake Exception in preAppend"); } else { return trx; } @@ -170,12 +170,8 @@ public class TestRaftStateMachineException { final long callId = 999; RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId, callId, new RaftTestUtil.SimpleMessage("message")); - try { - rpc.sendRequest(r); - Assert.fail("Exception expected"); - } catch (Exception e) { - e.printStackTrace(); - } + RaftClientReply reply = rpc.sendRequest(r); + Assert.assertTrue(reply.hasStateMachineException()); RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry( cluster.getLeader(), client.getId(), callId); @@ -183,12 +179,9 @@ public class TestRaftStateMachineException { Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry)); // retry - try { - rpc.sendRequest(r); - Assert.fail("Exception expected"); - } catch (Exception e) { - e.printStackTrace(); - } + reply = rpc.sendRequest(r); + Assert.assertTrue(reply.hasStateMachineException()); + RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry( cluster.getLeader(), client.getId(), callId); Assert.assertNotNull(currentEntry); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/24f5cc76/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index ca1b6d3..6a1fbd4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -386,14 +386,14 @@ public class RaftServerImpl implements RaftServer { */ private CompletableFuture<RaftClientReply> appendTransaction( RaftClientRequest request, TransactionContext context, - RetryCache.CacheEntry retryEntry) throws RaftException { + RetryCache.CacheEntry cacheEntry) throws RaftException { LOG.debug("{}: receive client request({})", getId(), request); lifeCycle.assertCurrentState(RUNNING); CompletableFuture<RaftClientReply> reply; final PendingRequest pending; synchronized (this) { - reply = checkLeaderState(request, retryEntry); + reply = checkLeaderState(request, cacheEntry); if (reply != null) { return reply; } @@ -403,13 +403,12 @@ public class RaftServerImpl implements RaftServer { try { entryIndex = state.applyLog(context, request.getClientId(), request.getCallId()); - } catch (IOException e) { - // TODO looks like the IOException is actually only thrown by the SM in - // the preAppend stage. In that case we should wrap the exception in - // StateMachineException and return the exception in a RaftClientReply. - RaftException re = new RaftException(e); - retryEntry.failWithException(re); - throw re; + } catch (StateMachineException e) { + // the StateMachineException is thrown by the SM in the preAppend stage. + // Return the exception in a RaftClientReply. + RaftClientReply exceptionReply = new RaftClientReply(request, e); + cacheEntry.failWithReply(exceptionReply); + return CompletableFuture.completedFuture(exceptionReply); } // put the request into the pending queue http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/24f5cc76/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 5cd0ee9..ff75237 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.impl; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.*; @@ -196,7 +197,7 @@ public class ServerState implements Closeable { } long applyLog(TransactionContext operation, ClientId clientId, long callId) - throws IOException { + throws StateMachineException { return log.append(currentTerm, operation, clientId, callId); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/24f5cc76/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 243da73..77e554b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.impl.ConfigurationManager; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; @@ -126,14 +127,18 @@ public abstract class RaftLog implements Closeable { * @return the index of the new log entry. */ public long append(long term, TransactionContext operation, - ClientId clientId, long callId) throws IOException { + ClientId clientId, long callId) throws StateMachineException { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { final long nextIndex = getNextIndex(); // This is called here to guarantee strict serialization of callback executions in case // the SM wants to attach a logic depending on ordered execution in the log commit order. - operation = operation.preAppendTransaction(); + try { + operation = operation.preAppendTransaction(); + } catch (IOException e) { + throw new StateMachineException(selfId.toString(), e); + } // build the log entry after calling the StateMachine final LogEntryProto e = ProtoUtils.toLogEntryProto(
