This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3f446aa RATIS-705. GrpcClientProtocolClient#close Interrupts itself.
Contributed by Lokesh Jain
3f446aa is described below
commit 3f446aaf27704b0bf929bd39887637a6a71b4418
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Oct 11 16:35:38 2019 +0800
RATIS-705. GrpcClientProtocolClient#close Interrupts itself. Contributed
by Lokesh Jain
---
.../org/apache/ratis/util/TimeoutScheduler.java | 9 +++--
.../grpc/client/GrpcClientProtocolClient.java | 2 +-
.../org/apache/ratis/RaftAsyncExceptionTests.java | 39 ++++++++++++++++++++++
3 files changed, 47 insertions(+), 3 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index 172018f..01ccf09 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -149,6 +149,10 @@ public final class TimeoutScheduler implements Closeable {
final TimeDuration grace = getGracePeriod();
LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
+ if (scheduler == null) {
+ shutdownTask = null;
+ return;
+ }
final ScheduledFuture<?> future = schedule(scheduler, () ->
tryShutdownScheduler(sid),
() -> "shutdown task #" + sid, grace);
shutdownTask = new ShutdownTask(sid, future);
@@ -161,7 +165,7 @@ public final class TimeoutScheduler implements Closeable {
}
private synchronized void tryShutdownScheduler(int sid) {
- if (sid == scheduleID) {
+ if (sid == scheduleID && scheduler != null) {
// No new tasks submitted, shutdown the scheduler.
LOG.debug("shutdown scheduler: sid {}", sid);
scheduler.shutdown();
@@ -176,7 +180,8 @@ public final class TimeoutScheduler implements Closeable {
onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
}
- @Override public synchronized void close() {
+ @Override
+ public synchronized void close() {
if (scheduler != null) {
LOG.debug("Closing ThreadPool");
scheduler.shutdownNow();
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 45cfeed..6e34433 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -140,13 +140,13 @@ public class GrpcClientProtocolClient implements
Closeable {
public void close() {
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
- scheduler.close();
channel.shutdown();
try {
channel.awaitTermination(5, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Unexpected exception while waiting for channel termination",
e);
}
+ scheduler.close();
}
RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws
IOException {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index f48e989..cdb0e6b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -19,11 +19,15 @@ package org.apache.ratis;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
@@ -32,11 +36,17 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public abstract class RaftAsyncExceptionTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ SimpleStateMachine4Testing.class, StateMachine.class);
+ }
+
@Test
public void testGroupMismatchException() throws Exception {
runWithNewCluster(1, this::runTestGroupMismatchException);
@@ -76,4 +86,33 @@ public abstract class RaftAsyncExceptionTests<CLUSTER
extends MiniRaftCluster>
}
}
}
+
+ @Test
+ public void testTimeoutException() throws Exception {
+ runWithNewCluster(3, this::runTestTimeoutException);
+ }
+
+ private void runTestTimeoutException(CLUSTER cluster) throws Exception {
+ // send a message to make sure the cluster is working
+ try(RaftClient client = cluster.createClient()) {
+ client.send(new SimpleMessage("m0"));
+
+ RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(),
+ TimeDuration.valueOf(3, TimeUnit.SECONDS));
+ // Block StartTransaction
+ cluster.getServers().stream()
+ .map(cluster::getRaftServerImpl)
+ .map(SimpleStateMachine4Testing::get)
+ .forEach(SimpleStateMachine4Testing::blockStartTransaction);
+ final CompletableFuture<RaftClientReply> replyFuture =
client.sendAsync(new SimpleMessage("m1"));
+ Thread.sleep(10000);
+ // Unblock StartTransaction
+ cluster.getServers().stream()
+ .map(cluster::getRaftServerImpl)
+ .map(SimpleStateMachine4Testing::get)
+ .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
+ // The request should succeed after start transaction is unblocked
+ Assert.assertTrue(replyFuture.get().isSuccess());
+ }
+ }
}