This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 78e95b9 RATIS-485. TimeoutScheduler is leaked by gRPC client
implementation. Contributed by Tsz Wo Nicholas Sze and Josh Elser
78e95b9 is described below
commit 78e95b9b079e84908ca3f5506d71b2aefa227990
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Aug 30 14:22:31 2019 -0700
RATIS-485. TimeoutScheduler is leaked by gRPC client implementation.
Contributed by Tsz Wo Nicholas Sze and Josh Elser
---
.../apache/ratis/client/RaftClientConfigKeys.java | 10 ---
.../apache/ratis/client/impl/RaftClientImpl.java | 8 +--
.../ratis/client/impl/RaftClientTestUtil.java | 4 --
.../java/org/apache/ratis/util/PeerProxyMap.java | 5 +-
.../org/apache/ratis/util/TimeoutScheduler.java | 80 ++++++++++++++++++----
.../ratis/examples/filestore/cli/Client.java | 1 -
.../grpc/client/GrpcClientProtocolClient.java | 3 +-
.../apache/ratis/server/impl/WatchRequests.java | 2 +-
.../test/java/org/apache/ratis/RaftAsyncTests.java | 5 --
.../apache/ratis/util/TestTimeoutScheduler.java | 33 ++++++++-
10 files changed, 104 insertions(+), 47 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index b07dade..bb01249 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -62,16 +62,6 @@ public interface RaftClientConfigKeys {
static void setMaxOutstandingRequests(RaftProperties properties, int
outstandingRequests) {
setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY,
outstandingRequests);
}
-
- String SCHEDULER_THREADS_KEY = PREFIX + ".scheduler-threads";
- int SCHEDULER_THREADS_DEFAULT = 3;
- static int schedulerThreads(RaftProperties properties) {
- return getInt(properties::getInt, SCHEDULER_THREADS_KEY,
- SCHEDULER_THREADS_DEFAULT, getDefaultLog(), requireMin(1));
- }
- static void setSchedulerThreads(RaftProperties properties, int
schedulerThreads) {
- setInt(properties::setInt, SCHEDULER_THREADS_KEY, schedulerThreads);
- }
}
static void main(String[] args) {
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7a93cab..dfd1b11 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -18,7 +18,6 @@
package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
@@ -97,7 +96,7 @@ final class RaftClientImpl implements RaftClient {
Preconditions.assertTrue(retryPolicy != null, "retry policy can't be
null");
this.retryPolicy = retryPolicy;
- scheduler =
TimeoutScheduler.newInstance(RaftClientConfigKeys.Async.schedulerThreads(properties));
+ scheduler = TimeoutScheduler.newInstance(0);
clientRpc.addServers(peers);
this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this,
properties));
@@ -376,10 +375,6 @@ final class RaftClientImpl implements RaftClient {
}
}
- void assertScheduler(int numThreads) {
- Preconditions.assertTrue(scheduler.getNumThreads() == numThreads);
- }
-
long getCallId() {
return CALL_ID_COUNTER.get();
}
@@ -391,6 +386,7 @@ final class RaftClientImpl implements RaftClient {
@Override
public void close() throws IOException {
+ scheduler.close();
clientRpc.close();
}
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 62b7d32..fbb62f7 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -29,10 +29,6 @@ public interface RaftClientTestUtil {
((RaftClientImpl)
client).getOrderedAsync().assertRequestSemaphore(expectedAvailablePermits,
expectedQueueLength);
}
- static void assertScheduler(RaftClient client, int numThreads){
- ((RaftClientImpl) client).assertScheduler(numThreads);
- }
-
static long getCallId(RaftClient client) {
return ((RaftClientImpl) client).getCallId();
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 014dc30..514c37e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -70,10 +70,11 @@ public class PeerProxyMap<PROXY extends Closeable>
implements Closeable {
lifeCycle.checkStateAndClose(() -> {
if (proxy != null) {
try {
+ LOG.debug("{}: Closing proxy for peer {}", name, peer);
proxy.close();
} catch (IOException e) {
- LOG.warn("{}: Failed to close proxy for peer {}, proxy class: ",
- name, peer, proxy.getClass());
+ LOG.warn("{}: Failed to close proxy for peer {}, proxy class: {}",
+ name, peer, proxy.getClass(), e);
}
}
});
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index db33b0f..172018f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -21,23 +21,50 @@ import org.apache.ratis.util.function.CheckedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
-public final class TimeoutScheduler {
+public final class TimeoutScheduler implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(TimeoutScheduler.class);
- private static final TimeDuration DEFAULT_GRACE_PERIOD =
TimeDuration.valueOf(1, TimeUnit.MINUTES);
+ static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1,
TimeUnit.MINUTES);
public static TimeoutScheduler newInstance(int numThreads) {
return new TimeoutScheduler(numThreads);
}
+ class ShutdownTask implements Runnable {
+ private final int sid;
+ private final ScheduledFuture<?> future;
+
+ ShutdownTask(int sid, ScheduledFuture<?> future) {
+ this.sid = sid;
+ this.future = future;
+ }
+
+ int getSid() {
+ return sid;
+ }
+
+ @Override
+ public void run() {
+ tryShutdownScheduler(sid);
+ }
+
+ void cancel() {
+ future.cancel(false);
+ }
+ }
+
/** When there is no tasks, the time period to wait before shutting down the
scheduler. */
private final AtomicReference<TimeDuration> gracePeriod = new
AtomicReference<>(DEFAULT_GRACE_PERIOD);
@@ -46,16 +73,17 @@ public final class TimeoutScheduler {
/** The scheduleID for each task */
private int scheduleID = 0;
+ private ShutdownTask shutdownTask = null;
+
private final int numThreads;
- private volatile ScheduledExecutorService scheduler = null;
+ private volatile ScheduledThreadPoolExecutor scheduler = null;
private TimeoutScheduler(int numThreads) {
this.numThreads = numThreads;
}
- public int getNumThreads() {
- final ScheduledExecutorService s = scheduler;
- return s instanceof ScheduledThreadPoolExecutor?
((ScheduledThreadPoolExecutor)s).getCorePoolSize(): numThreads;
+ int getQueueSize() {
+ return
Optional.ofNullable(scheduler).map(ScheduledThreadPoolExecutor::getQueue).map(Collection::size).orElse(0);
}
TimeDuration getGracePeriod() {
@@ -95,7 +123,8 @@ public final class TimeoutScheduler {
if (scheduler == null) {
Preconditions.assertTrue(numTasks == 0);
LOG.debug("Initialize scheduler");
- scheduler = Executors.newScheduledThreadPool(numThreads, Daemon::new);
+ scheduler = new ScheduledThreadPoolExecutor(numThreads, (ThreadFactory)
Daemon::new);
+ scheduler.setRemoveOnCancelPolicy(true);
}
numTasks++;
final int sid = scheduleID++;
@@ -105,16 +134,29 @@ public final class TimeoutScheduler {
}
private synchronized void onTaskCompleted() {
- if (--numTasks == 0) {
- final int sid = scheduleID;
- final TimeDuration grace = getGracePeriod();
- LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
- schedule(scheduler, () -> tryShutdownScheduler(sid), () -> "shutdown
task #" + sid, grace);
+ if (--numTasks > 0) {
+ return;
}
+ final int sid = scheduleID;
+ if (shutdownTask != null) {
+ if (shutdownTask.getSid() == sid) {
+ // the shutdown task is still valid
+ return;
+ }
+ // the shutdown task is invalid
+ shutdownTask.cancel();
+ }
+
+ final TimeDuration grace = getGracePeriod();
+ LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
+ final ScheduledFuture<?> future = schedule(scheduler, () ->
tryShutdownScheduler(sid),
+ () -> "shutdown task #" + sid, grace);
+ shutdownTask = new ShutdownTask(sid, future);
}
- static void schedule(ScheduledExecutorService service, Runnable task,
Supplier<String> name, TimeDuration timeDuration) {
- service.schedule(LogUtils.newRunnable(LOG, task, name),
+ private static ScheduledFuture<?> schedule(
+ ScheduledExecutorService service, Runnable task, Supplier<String> name,
TimeDuration timeDuration) {
+ return service.schedule(LogUtils.newRunnable(LOG, task, name),
timeDuration.getDuration(), timeDuration.getUnit());
}
@@ -133,4 +175,12 @@ public final class TimeoutScheduler {
public void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger
log, Supplier<String> errorMessage) {
onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
}
+
+ @Override public synchronized void close() {
+ if (scheduler != null) {
+ LOG.debug("Closing ThreadPool");
+ scheduler.shutdownNow();
+ scheduler = null;
+ }
+ }
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 63de425..8f9c888 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -63,7 +63,6 @@ public abstract class Client extends SubCommandBase {
RaftClientConfigKeys.Rpc.setRequestTimeout(raftProperties,
TimeDuration.valueOf(50000, TimeUnit.MILLISECONDS));
- RaftClientConfigKeys.Async.setSchedulerThreads(raftProperties, 10);
RaftClientConfigKeys.Async.setMaxOutstandingRequests(raftProperties, 1000);
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 8b329e0..e2b6572 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -79,7 +79,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final ManagedChannel channel;
private final TimeDuration requestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(3);
+ private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
private final RaftClientProtocolServiceBlockingStub blockingStub;
private final RaftClientProtocolServiceStub asyncStub;
@@ -139,6 +139,7 @@ public class GrpcClientProtocolClient implements Closeable {
public void close() {
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
+ scheduler.close();
channel.shutdownNow();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 92197e6..266e5cd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -168,7 +168,7 @@ class WatchRequests {
private final TimeDuration watchTimeoutNanos;
private final TimeDuration watchTimeoutDenominationNanos;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(2);
+ private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
WatchRequests(Object name, RaftProperties properties) {
this.name = name + "-" + getClass().getSimpleName();
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 8ca2867..fded453 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -85,19 +85,14 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
RaftClient.Builder clientBuilder = RaftClient.newBuilder()
.setRaftGroup(RaftGroup.emptyGroup())
.setProperties(properties);
- int numThreads = RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT;
int maxOutstandingRequests =
RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT;
try(RaftClient client = clientBuilder.build()) {
- RaftClientTestUtil.assertScheduler(client, numThreads);
RaftClientTestUtil.assertAsyncRequestSemaphore(client,
maxOutstandingRequests, 0);
}
- numThreads = 200;
maxOutstandingRequests = 5;
RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties,
maxOutstandingRequests);
- RaftClientConfigKeys.Async.setSchedulerThreads(properties, numThreads);
try(RaftClient client = clientBuilder.build()) {
- RaftClientTestUtil.assertScheduler(client, numThreads);
RaftClientTestUtil.assertAsyncRequestSemaphore(client,
maxOutstandingRequests, 0);
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
index 6a63569..fce230a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,7 @@
package org.apache.ratis.util;
import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
import org.junit.Assert;
import org.junit.Test;
@@ -25,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
-public class TestTimeoutScheduler {
+public class TestTimeoutScheduler extends BaseTest {
{
LogUtils.setLogLevel(TimeoutScheduler.LOG, Level.ALL);
}
@@ -207,4 +208,32 @@ public class TestTimeoutScheduler {
errorHandler.assertNoError();
}
+
+ @Test(timeout = 1000)
+ public void testShutdown() throws Exception {
+ final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
+ Assert.assertEquals(TimeoutScheduler.DEFAULT_GRACE_PERIOD,
scheduler.getGracePeriod());
+ final ErrorHandler errorHandler = new ErrorHandler();
+
+ final int numTasks = 100;
+ for(int i = 0; i < numTasks; i++) {
+ // long timeout
+ scheduler.onTimeout(HUNDRED_MILLIS, () -> {}, errorHandler);
+ }
+ HUNDRED_MILLIS.sleep();
+ HUNDRED_MILLIS.sleep();
+ Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task
is scheduled
+
+ final TimeDuration oneMillis = TimeDuration.valueOf(1,
TimeUnit.MILLISECONDS);
+ for(int i = 0; i < numTasks; i++) {
+ // short timeout
+ scheduler.onTimeout(oneMillis, () -> {}, errorHandler);
+ oneMillis.sleep();
+ oneMillis.sleep();
+ }
+ HUNDRED_MILLIS.sleep();
+ Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task
is scheduled
+
+ errorHandler.assertNoError();
+ }
}