This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e95b9  RATIS-485. TimeoutScheduler is leaked by gRPC client 
implementation.  Contributed by Tsz Wo Nicholas Sze and Josh Elser
78e95b9 is described below

commit 78e95b9b079e84908ca3f5506d71b2aefa227990
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Aug 30 14:22:31 2019 -0700

    RATIS-485. TimeoutScheduler is leaked by gRPC client implementation.  
Contributed by Tsz Wo Nicholas Sze and Josh Elser
---
 .../apache/ratis/client/RaftClientConfigKeys.java  | 10 ---
 .../apache/ratis/client/impl/RaftClientImpl.java   |  8 +--
 .../ratis/client/impl/RaftClientTestUtil.java      |  4 --
 .../java/org/apache/ratis/util/PeerProxyMap.java   |  5 +-
 .../org/apache/ratis/util/TimeoutScheduler.java    | 80 ++++++++++++++++++----
 .../ratis/examples/filestore/cli/Client.java       |  1 -
 .../grpc/client/GrpcClientProtocolClient.java      |  3 +-
 .../apache/ratis/server/impl/WatchRequests.java    |  2 +-
 .../test/java/org/apache/ratis/RaftAsyncTests.java |  5 --
 .../apache/ratis/util/TestTimeoutScheduler.java    | 33 ++++++++-
 10 files changed, 104 insertions(+), 47 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index b07dade..bb01249 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -62,16 +62,6 @@ public interface RaftClientConfigKeys {
     static void setMaxOutstandingRequests(RaftProperties properties, int 
outstandingRequests) {
       setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY, 
outstandingRequests);
     }
-
-    String SCHEDULER_THREADS_KEY = PREFIX + ".scheduler-threads";
-    int SCHEDULER_THREADS_DEFAULT = 3;
-    static int schedulerThreads(RaftProperties properties) {
-      return getInt(properties::getInt, SCHEDULER_THREADS_KEY,
-          SCHEDULER_THREADS_DEFAULT, getDefaultLog(), requireMin(1));
-    }
-    static void setSchedulerThreads(RaftProperties properties, int 
schedulerThreads) {
-      setInt(properties::setInt, SCHEDULER_THREADS_KEY, schedulerThreads);
-    }
   }
 
   static void main(String[] args) {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7a93cab..dfd1b11 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
@@ -97,7 +96,7 @@ final class RaftClientImpl implements RaftClient {
     Preconditions.assertTrue(retryPolicy != null, "retry policy can't be 
null");
     this.retryPolicy = retryPolicy;
 
-    scheduler = 
TimeoutScheduler.newInstance(RaftClientConfigKeys.Async.schedulerThreads(properties));
+    scheduler = TimeoutScheduler.newInstance(0);
     clientRpc.addServers(peers);
 
     this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this, 
properties));
@@ -376,10 +375,6 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
-  void assertScheduler(int numThreads) {
-    Preconditions.assertTrue(scheduler.getNumThreads() == numThreads);
-  }
-
   long getCallId() {
     return CALL_ID_COUNTER.get();
   }
@@ -391,6 +386,7 @@ final class RaftClientImpl implements RaftClient {
 
   @Override
   public void close() throws IOException {
+    scheduler.close();
     clientRpc.close();
   }
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 62b7d32..fbb62f7 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -29,10 +29,6 @@ public interface RaftClientTestUtil {
     ((RaftClientImpl) 
client).getOrderedAsync().assertRequestSemaphore(expectedAvailablePermits, 
expectedQueueLength);
   }
 
-  static void assertScheduler(RaftClient client, int numThreads){
-    ((RaftClientImpl) client).assertScheduler(numThreads);
-  }
-
   static long getCallId(RaftClient client) {
     return ((RaftClientImpl) client).getCallId();
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 014dc30..514c37e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -70,10 +70,11 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
       lifeCycle.checkStateAndClose(() -> {
         if (proxy != null) {
           try {
+            LOG.debug("{}: Closing proxy for peer {}", name, peer);
             proxy.close();
           } catch (IOException e) {
-            LOG.warn("{}: Failed to close proxy for peer {}, proxy class: ",
-                name, peer, proxy.getClass());
+            LOG.warn("{}: Failed to close proxy for peer {}, proxy class: {}",
+                name, peer, proxy.getClass(), e);
           }
         }
       });
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index db33b0f..172018f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -21,23 +21,50 @@ import org.apache.ratis.util.function.CheckedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executors;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-public final class TimeoutScheduler {
+public final class TimeoutScheduler implements Closeable {
   public static final Logger LOG = 
LoggerFactory.getLogger(TimeoutScheduler.class);
 
-  private static final TimeDuration DEFAULT_GRACE_PERIOD = 
TimeDuration.valueOf(1, TimeUnit.MINUTES);
+  static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, 
TimeUnit.MINUTES);
 
   public static TimeoutScheduler newInstance(int numThreads) {
     return new TimeoutScheduler(numThreads);
   }
 
+  class ShutdownTask implements Runnable {
+    private final int sid;
+    private final ScheduledFuture<?> future;
+
+    ShutdownTask(int sid, ScheduledFuture<?> future) {
+      this.sid = sid;
+      this.future = future;
+    }
+
+    int getSid() {
+      return sid;
+    }
+
+    @Override
+    public void run() {
+      tryShutdownScheduler(sid);
+    }
+
+    void cancel() {
+      future.cancel(false);
+    }
+  }
+
   /** When there is no tasks, the time period to wait before shutting down the 
scheduler. */
   private final AtomicReference<TimeDuration> gracePeriod = new 
AtomicReference<>(DEFAULT_GRACE_PERIOD);
 
@@ -46,16 +73,17 @@ public final class TimeoutScheduler {
   /** The scheduleID for each task */
   private int scheduleID = 0;
 
+  private ShutdownTask shutdownTask = null;
+
   private final int numThreads;
-  private volatile ScheduledExecutorService scheduler = null;
+  private volatile ScheduledThreadPoolExecutor scheduler = null;
 
   private TimeoutScheduler(int numThreads) {
     this.numThreads = numThreads;
   }
 
-  public int getNumThreads() {
-    final ScheduledExecutorService s = scheduler;
-    return s instanceof ScheduledThreadPoolExecutor? 
((ScheduledThreadPoolExecutor)s).getCorePoolSize(): numThreads;
+  int getQueueSize() {
+    return 
Optional.ofNullable(scheduler).map(ScheduledThreadPoolExecutor::getQueue).map(Collection::size).orElse(0);
   }
 
   TimeDuration getGracePeriod() {
@@ -95,7 +123,8 @@ public final class TimeoutScheduler {
     if (scheduler == null) {
       Preconditions.assertTrue(numTasks == 0);
       LOG.debug("Initialize scheduler");
-      scheduler = Executors.newScheduledThreadPool(numThreads, Daemon::new);
+      scheduler = new ScheduledThreadPoolExecutor(numThreads, (ThreadFactory) 
Daemon::new);
+      scheduler.setRemoveOnCancelPolicy(true);
     }
     numTasks++;
     final int sid = scheduleID++;
@@ -105,16 +134,29 @@ public final class TimeoutScheduler {
   }
 
   private synchronized void onTaskCompleted() {
-    if (--numTasks == 0) {
-      final int sid = scheduleID;
-      final TimeDuration grace = getGracePeriod();
-      LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
-      schedule(scheduler, () -> tryShutdownScheduler(sid), () -> "shutdown 
task #" + sid, grace);
+    if (--numTasks > 0) {
+      return;
     }
+    final int sid = scheduleID;
+    if (shutdownTask != null) {
+      if (shutdownTask.getSid() == sid) {
+        // the shutdown task is still valid
+        return;
+      }
+      // the shutdown task is invalid
+      shutdownTask.cancel();
+    }
+
+    final TimeDuration grace = getGracePeriod();
+    LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid);
+    final ScheduledFuture<?> future = schedule(scheduler, () -> 
tryShutdownScheduler(sid),
+        () -> "shutdown task #" + sid, grace);
+    shutdownTask = new ShutdownTask(sid, future);
   }
 
-  static void schedule(ScheduledExecutorService service, Runnable task, 
Supplier<String> name, TimeDuration timeDuration) {
-    service.schedule(LogUtils.newRunnable(LOG, task, name),
+  private static ScheduledFuture<?> schedule(
+      ScheduledExecutorService service, Runnable task, Supplier<String> name, 
TimeDuration timeDuration) {
+    return service.schedule(LogUtils.newRunnable(LOG, task, name),
         timeDuration.getDuration(), timeDuration.getUnit());
   }
 
@@ -133,4 +175,12 @@ public final class TimeoutScheduler {
   public void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger 
log, Supplier<String> errorMessage) {
     onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
   }
+
+  @Override public synchronized void close() {
+    if (scheduler != null) {
+      LOG.debug("Closing ThreadPool");
+      scheduler.shutdownNow();
+      scheduler = null;
+    }
+  }
 }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 63de425..8f9c888 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -63,7 +63,6 @@ public abstract class Client extends SubCommandBase {
 
     RaftClientConfigKeys.Rpc.setRequestTimeout(raftProperties,
         TimeDuration.valueOf(50000, TimeUnit.MILLISECONDS));
-    RaftClientConfigKeys.Async.setSchedulerThreads(raftProperties, 10);
     RaftClientConfigKeys.Async.setMaxOutstandingRequests(raftProperties, 1000);
 
 
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 8b329e0..e2b6572 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -79,7 +79,7 @@ public class GrpcClientProtocolClient implements Closeable {
   private final ManagedChannel channel;
 
   private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(3);
+  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
 
   private final RaftClientProtocolServiceBlockingStub blockingStub;
   private final RaftClientProtocolServiceStub asyncStub;
@@ -139,6 +139,7 @@ public class GrpcClientProtocolClient implements Closeable {
   public void close() {
     
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
+    scheduler.close();
     channel.shutdownNow();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 92197e6..266e5cd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -168,7 +168,7 @@ class WatchRequests {
 
   private final TimeDuration watchTimeoutNanos;
   private final TimeDuration watchTimeoutDenominationNanos;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(2);
+  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
 
   WatchRequests(Object name, RaftProperties properties) {
     this.name = name + "-" + getClass().getSimpleName();
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 8ca2867..fded453 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -85,19 +85,14 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     RaftClient.Builder clientBuilder = RaftClient.newBuilder()
         .setRaftGroup(RaftGroup.emptyGroup())
         .setProperties(properties);
-    int numThreads = RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT;
     int maxOutstandingRequests = 
RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT;
     try(RaftClient client = clientBuilder.build()) {
-      RaftClientTestUtil.assertScheduler(client, numThreads);
       RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
     }
 
-    numThreads = 200;
     maxOutstandingRequests = 5;
     RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 
maxOutstandingRequests);
-    RaftClientConfigKeys.Async.setSchedulerThreads(properties, numThreads);
     try(RaftClient client = clientBuilder.build()) {
-      RaftClientTestUtil.assertScheduler(client, numThreads);
       RaftClientTestUtil.assertAsyncRequestSemaphore(client, 
maxOutstandingRequests, 0);
     }
   }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
index 6a63569..fce230a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,7 @@
 package org.apache.ratis.util;
 
 import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -25,7 +26,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
-public class TestTimeoutScheduler {
+public class TestTimeoutScheduler extends BaseTest {
   {
     LogUtils.setLogLevel(TimeoutScheduler.LOG, Level.ALL);
   }
@@ -207,4 +208,32 @@ public class TestTimeoutScheduler {
 
     errorHandler.assertNoError();
   }
+
+  @Test(timeout = 1000)
+  public void testShutdown() throws Exception {
+    final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(0);
+    Assert.assertEquals(TimeoutScheduler.DEFAULT_GRACE_PERIOD, 
scheduler.getGracePeriod());
+    final ErrorHandler errorHandler = new ErrorHandler();
+
+    final int numTasks = 100;
+    for(int i = 0; i < numTasks; i++) {
+      // long timeout
+      scheduler.onTimeout(HUNDRED_MILLIS, () -> {}, errorHandler);
+    }
+    HUNDRED_MILLIS.sleep();
+    HUNDRED_MILLIS.sleep();
+    Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task 
is scheduled
+
+    final TimeDuration oneMillis = TimeDuration.valueOf(1, 
TimeUnit.MILLISECONDS);
+    for(int i = 0; i < numTasks; i++) {
+      // short timeout
+      scheduler.onTimeout(oneMillis, () -> {}, errorHandler);
+      oneMillis.sleep();
+      oneMillis.sleep();
+    }
+    HUNDRED_MILLIS.sleep();
+    Assert.assertEquals(1, scheduler.getQueueSize()); // only 1 shutdown task 
is scheduled
+
+    errorHandler.assertNoError();
+  }
 }

Reply via email to