asfgit closed pull request #402: ACCUMULO-4615: Updated get status for thread 
safety and with a per-task timeout
URL: https://github.com/apache/accumulo/pull/402
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index eea039f75d..ac16aa5fc5 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -344,6 +344,8 @@
       "The time between adjustments of the coordinator thread pool"),
   MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", 
PropertyType.COUNT,
       "The number of threads to use when fetching the tablet server status for 
balancing."),
+  MASTER_STATUS_THREAD_TIMEOUT("master.status.thread.timeout", "6s", 
PropertyType.TIMEDURATION,
+      "The timeout that master will use when collecting each tablet server 
status."),
   MASTER_METADATA_SUSPENDABLE("master.metadata.suspendable", "false", 
PropertyType.BOOLEAN,
       "Allow tablets for the " + MetadataTable.NAME
           + " table to be suspended via table.suspend.duration."),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9414b98543..7f9f8c4977 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -32,9 +32,8 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -86,6 +85,9 @@
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.master.TimeoutTaskExecutor.ExceptionCallback;
+import org.apache.accumulo.master.TimeoutTaskExecutor.SuccessCallback;
+import org.apache.accumulo.master.TimeoutTaskExecutor.TimeoutCallback;
 import org.apache.accumulo.master.metrics.MasterMetricsFactory;
 import org.apache.accumulo.master.recovery.RecoveryManager;
 import org.apache.accumulo.master.replication.MasterReplicationCoordinator;
@@ -1147,62 +1149,94 @@ private long balanceTablets() {
   private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
       Set<TServerInstance> currentServers) {
     long start = System.currentTimeMillis();
+
     int threads = 
Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE), 
1);
-    ExecutorService tp = Executors.newFixedThreadPool(threads);
-    final SortedMap<TServerInstance,TabletServerStatus> result = new 
TreeMap<>();
-    for (TServerInstance serverInstance : currentServers) {
-      final TServerInstance server = serverInstance;
-      tp.submit(new Runnable() {
+    long timeout = 
getConfiguration().getTimeInMillis(Property.MASTER_STATUS_THREAD_TIMEOUT);
+    final SortedMap<TServerInstance,TabletServerStatus> results = new 
TreeMap<>();
+
+    try (
+        TimeoutTaskExecutor<TabletServerStatus,GetTServerStatus> executor = 
new TimeoutTaskExecutor<>(
+            threads, timeout, currentServers.size())) {
+      executor.onSuccess(new 
SuccessCallback<TabletServerStatus,GetTServerStatus>() {
         @Override
-        public void run() {
-          try {
-            Thread t = Thread.currentThread();
-            String oldName = t.getName();
-            try {
-              t.setName("Getting status from " + server);
-              TServerConnection connection = tserverSet.getConnection(server);
-              if (connection == null)
-                throw new IOException("No connection to " + server);
-              TabletServerStatus status = connection.getTableMap(false);
-              result.put(server, status);
-            } finally {
-              t.setName(oldName);
-            }
-          } catch (Exception ex) {
-            log.error("unable to get tablet server status " + server + " " + 
ex.toString());
-            log.debug("unable to get tablet server status " + server, ex);
-            if (badServers.get(server).incrementAndGet() > 
MAX_BAD_STATUS_COUNT) {
-              log.warn("attempting to stop " + server);
-              try {
-                TServerConnection connection = 
tserverSet.getConnection(server);
-                if (connection != null) {
-                  connection.halt(masterLock);
-                }
-              } catch (TTransportException e) {
-                // ignore: it's probably down
-              } catch (Exception e) {
-                log.info("error talking to troublesome tablet server ", e);
-              }
-              badServers.remove(server);
-            }
-          }
+        public void accept(GetTServerStatus task, TabletServerStatus result) {
+          results.put(task.server, result);
         }
       });
-    }
-    tp.shutdown();
-    try {
-      
tp.awaitTermination(getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT)
 * 2,
-          TimeUnit.MILLISECONDS);
+
+      executor.onException(new ExceptionCallback<GetTServerStatus>() {
+        @Override
+        public void accept(GetTServerStatus task, Exception e) {
+          log.error("Exception occurred while getting status from " + 
task.server, e);
+          handleBadServer(task.server);
+        }
+      });
+
+      executor.onTimeout(new TimeoutCallback<GetTServerStatus>() {
+        @Override
+        public void accept(GetTServerStatus task) {
+          log.warn("Timed out while fetching status from " + task.server);
+          handleBadServer(task.server);
+        }
+      });
+
+      for (TServerInstance server : currentServers) {
+        executor.submit(new GetTServerStatus(server));
+      }
+
+      executor.complete();
     } catch (InterruptedException e) {
-      log.debug("Interrupted while fetching status");
+      log.debug("StatusThread interrupted while waiting for tserver status 
collection.", e);
     }
+
     synchronized (badServers) {
       badServers.keySet().retainAll(currentServers);
-      badServers.keySet().removeAll(result.keySet());
+      badServers.keySet().removeAll(results.keySet());
     }
+
     log.debug(String.format("Finished gathering information from %d servers in 
%.2f seconds",
-        result.size(), (System.currentTimeMillis() - start) / 1000.));
-    return result;
+        currentServers.size(), (System.currentTimeMillis() - start) / 1000.));
+    return results;
+  }
+
+  private void handleBadServer(TServerInstance server) {
+    if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
+      log.warn("attempting to stop " + server);
+      try {
+        TServerConnection connection = tserverSet.getConnection(server);
+        if (connection != null) {
+          connection.halt(masterLock);
+        }
+      } catch (TTransportException e) {
+        // ignore: it's probably down
+      } catch (Exception e) {
+        log.info("error talking to troublesome tablet server ", e);
+      }
+      badServers.remove(server);
+    }
+  }
+
+  private class GetTServerStatus implements Callable<TabletServerStatus> {
+    public final TServerInstance server;
+
+    public GetTServerStatus(TServerInstance server) {
+      this.server = server;
+    }
+
+    @Override
+    public TabletServerStatus call() throws Exception {
+      Thread t = Thread.currentThread();
+      String oldName = t.getName();
+      try {
+        t.setName("Getting status from " + server);
+        TServerConnection connection = tserverSet.getConnection(server);
+        if (connection == null)
+          throw new IOException("No connection to " + server);
+        return connection.getTableMap(false);
+      } finally {
+        t.setName(oldName);
+      }
+    }
   }
 
   public void run() throws IOException, InterruptedException, KeeperException {
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/TimeoutTaskExecutor.java
 
b/server/master/src/main/java/org/apache/accumulo/master/TimeoutTaskExecutor.java
new file mode 100644
index 0000000000..3a966c2bc3
--- /dev/null
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/TimeoutTaskExecutor.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Runs one or more tasks with a timeout per task (instead of a timeout for 
the entire pool). Uses
+ * callbacks to invoke functions on successful, timed out, or tasks that error.
+ *  
+ * This class uses an underlying fixed thread pool to schedule the submitted 
tasks. Once a task is
+ * submitted, the desired end time for the task is recorded and used to 
determine the timeout for
+ * the task's associated {@link Future}.
+ *  
+ * The timeout will not be exact as the start time is recorded prior to 
submitting the
+ * {@link Callable}. This may result in an effective timeout that is slightly 
smaller than expected.
+ * The timeout used during initialization should be adjusted accordingly.
+ *  
+ * The {@link TimeoutTaskExecutor} itself is not a thread-safe class. Only a 
single thread should
+ * submit tasks and complete them. The callback methods will be invoked from 
the same thread that
+ * called TimeoutTaskExecutor.complete(), so the callback methods need not be 
thread-safe.
+ *
+ * @param <T>
+ *          The return type for the corresponding Callable.
+ * @param <C>
+ *          The type of Callable submitted to this executor.
+ */
+@NotThreadSafe
+public class TimeoutTaskExecutor<T,C extends Callable<T>> implements 
AutoCloseable {
+
+  private final static Logger log = 
LoggerFactory.getLogger(TimeoutTaskExecutor.class);
+
+  private final long timeoutInNanos;
+  private final ExecutorService executorService;
+  private final BlockingQueue<WrappedTask> startedTasks;
+  private final List<WrappedTask> wrappedTasks;
+
+  private SuccessCallback<T,C> successCallback;
+  private ExceptionCallback<C> exceptionCallback;
+  private TimeoutCallback<C> timeoutCallback;
+
+  private volatile boolean isCompleting = false;
+
+  /**
+   * Constructs a new TimeoutTaskExecutor that will use the given number of 
worker threads and
+   * timeout. Takes an expected number of Callables to initialize the 
underlying data structures
+   * appropriately.
+   *  
+   * If the expectedNumCallables is sized too small, this executor will block 
on calls to submit()
+   * once the internal queue is full.
+   *
+   * @param numThreads
+   *          The number of threads to use.
+   * @param timeoutInMillis
+   *          The timeout for each task in milliseconds.
+   * @param expectedNumCallables
+   *          The expected number of callables you will schedule. Note this is 
used for an
+   *          underlying BlockingQueue. If sized too small this will cause 
blocking when calling
+   *          submit().
+   * @throws IllegalArgumentException
+   *           If numThreads is less than 1 or expectedNumCallables is 
negative.
+   */
+  public TimeoutTaskExecutor(int numThreads, long timeoutInMillis, int 
expectedNumCallables) {
+    Preconditions.checkArgument(numThreads >= 1, "Number of threads must be at 
least 1.");
+    Preconditions.checkArgument(expectedNumCallables >= 0,
+        "The expected number of callables must be non-negative.");
+
+    this.executorService = Executors.newFixedThreadPool(numThreads);
+    this.startedTasks = new ArrayBlockingQueue<>(expectedNumCallables);
+    this.timeoutInNanos = TimeUnit.MILLISECONDS.toNanos(timeoutInMillis);
+    this.wrappedTasks = new ArrayList<>(expectedNumCallables);
+  }
+
+  /**
+   * Submits a new task to the executor.
+   *
+   * @param callable
+   *          Task to run
+   * @throws ConcurrentModificationException
+   *           if this method is invoked while a complete() operation is in 
progress.
+   */
+  public void submit(C callable) {
+    if (isCompleting) {
+      throw new ConcurrentModificationException(
+          "TimeoutTaskExecutor is not a thread-safe class but complete() is 
currently in progress.");
+    }
+
+    WrappedTask wt = new WrappedTask(callable);
+    wt.future = executorService.submit(wt);
+    wrappedTasks.add(wt);
+  }
+
+  /**
+   * Registers the callback to use on successful tasks.
+   *
+   * @param successCallback
+   *          The callback function to invoke on success.
+   * @throws NullPointerException
+   *           when a null successCallback is passed in
+   */
+  public void onSuccess(SuccessCallback<T,C> successCallback) {
+    this.successCallback = Objects.requireNonNull(successCallback,
+        "Must provide a non-null successCallback.");
+  }
+
+  /**
+   * Registers the callback to use on tasks that throw exceptions.
+   *
+   * @param exceptionCallback
+   *          The callback function to invoke on exceptions.
+   * @throws NullPointerException
+   *           when a null exceptionCallback is passed in
+   */
+  public void onException(ExceptionCallback<C> exceptionCallback) {
+    this.exceptionCallback = Objects.requireNonNull(exceptionCallback,
+        "Must provide a non-null exceptionCallback.");
+  }
+
+  /**
+   * Registers the callback to use on tasks that time out.
+   *
+   * @param timeoutCallback
+   *          The callback function to invoke on timeouts.
+   * @throws NullPointerException
+   *           when a null timeoutCallback is passed in
+   */
+  public void onTimeout(TimeoutCallback<C> timeoutCallback) {
+    this.timeoutCallback = Objects.requireNonNull(timeoutCallback,
+        "Must provide a non-null timeoutCallback.");
+  }
+
+  /**
+   * Completes all the current tasks by dispatching to the appropriate 
callback.
+   *
+   * @throws IllegalStateException
+   *           If all of the callbacks were not registered before calling this 
method.
+   * @throws InterruptedException
+   *           If interrupted while awaiting callable results.
+   */
+  public void complete() throws InterruptedException {
+    isCompleting = true;
+
+    try {
+      Preconditions.checkState(successCallback != null,
+          "Must set a success callback before completing " + this);
+      Preconditions.checkState(exceptionCallback != null,
+          "Must set an exception callback before completing " + this);
+      Preconditions.checkState(timeoutCallback != null,
+          "Must set a timeout callback before completing " + this);
+
+      int unfinished = wrappedTasks.size();
+
+      while (unfinished > 0) {
+        // poll for twice the timeout which should definitely yield a new 
running task
+        WrappedTask wt = startedTasks.poll(timeoutInNanos * 2, 
TimeUnit.NANOSECONDS);
+        if (wt != null) {
+          completeTask(wt);
+          wt.hasCompleted = true;
+          unfinished--;
+        }
+      }
+
+      wrappedTasks.clear();
+    } finally {
+      isCompleting = false;
+    }
+  }
+
+  private void completeTask(WrappedTask wt) throws InterruptedException {
+    try {
+      handleSuccess(wt);
+    } catch (InterruptedException e) {
+      throw e;
+    } catch (TimeoutException e) {
+      handleTimeout(wt);
+    } catch (Exception e) {
+      handleException(wt, e);
+    }
+  }
+
+  private void handleSuccess(WrappedTask wt)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    long waitTime = wt.endTime - System.nanoTime();
+    waitTime = (waitTime < 0 ? 0 : waitTime);
+    successCallback.accept(wt.callable, wt.future.get(waitTime, 
TimeUnit.NANOSECONDS));
+  }
+
+  private void handleTimeout(WrappedTask wt) {
+    wt.future.cancel(true);
+    timeoutCallback.accept(wt.callable);
+  }
+
+  private void handleException(WrappedTask wt, Exception e) {
+    exceptionCallback.accept(wt.callable, e);
+  }
+
+  @Override
+  public void close() {
+    try {
+      executorService.shutdownNow();
+    } catch (Exception e) {
+      log.warn("Error while shutting down " + this, e);
+    }
+  }
+
+  /*
+   * A wrapper for a Callable that keeps additional information. This tracks 
the desired end time,
+   * if it has completed (either finished or cancelled), and keeps the 
associated future.
+   *
+   * The only state shared between the executor thread and the worker threads 
is the startedTasks
+   * BlockingQueue and the endTime variable. The endTime will be set when the 
worker starts by the
+   * worker thread and then read by the executor thread.
+   */
+  private class WrappedTask implements Callable<T> {
+    final C callable;
+
+    // Set by worker thread and read by master thread
+    volatile long endTime;
+
+    // Set and read only by master thread
+    boolean hasCompleted = false;
+
+    Future<T> future;
+
+    WrappedTask(C callable) {
+      this.callable = callable;
+    }
+
+    @Override
+    public T call() throws Exception {
+      endTime = timeoutInNanos + System.nanoTime();
+      startedTasks.put(this);
+      return callable.call();
+    }
+  }
+
+  /**
+   * Callback interface for a task that was successful.
+   *
+   * @param <T>
+   *          The result of the Callable
+   * @param <C>
+   *          The Callable
+   */
+  public interface SuccessCallback<T,C> {
+    void accept(C task, T result);
+  }
+
+  /**
+   * Callback interface for a task that threw an Exception.
+   *
+   * @param <C>
+   *          The Callable
+   */
+  public interface ExceptionCallback<C> {
+    void accept(C task, Exception e);
+  }
+
+  /**
+   * Callback interface for a task that timed out.
+   *
+   * @param <C>
+   *          The Callable
+   */
+  public interface TimeoutCallback<C> {
+    void accept(C task);
+  }
+}
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/TimeoutTaskExecutorTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/TimeoutTaskExecutorTest.java
new file mode 100644
index 0000000000..3765319163
--- /dev/null
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/TimeoutTaskExecutorTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+
+import org.apache.accumulo.master.TimeoutTaskExecutor.SuccessCallback;
+import org.apache.accumulo.master.TimeoutTaskExecutor.TimeoutCallback;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class TimeoutTaskExecutorTest {
+
+  private TimeoutTaskExecutor<String,DummyTask> executor;
+  private long timeout = 100;
+
+  private Collection<String> results;
+  private Collection<DummyTask> timeouts;
+
+  @Before
+  public void setup() {
+    int numThreads = 2;
+    executor = new TimeoutTaskExecutor<>(numThreads, timeout, 3);
+
+    results = new ArrayList<>();
+    timeouts = new ArrayList<>();
+
+    executor.onSuccess(new SuccessCallback<String,DummyTask>() {
+      @Override
+      public void accept(DummyTask task, String result) {
+        results.add(result);
+      }
+    });
+
+    executor.onTimeout(new TimeoutCallback<DummyTask>() {
+      @Override
+      public void accept(DummyTask task) {
+        timeouts.add(task);
+      }
+    });
+
+    executor.onException(new 
TimeoutTaskExecutor.ExceptionCallback<DummyTask>() {
+      @Override
+      public void accept(DummyTask task, Exception e) {
+        e.printStackTrace();
+        fail("Unexpected exception");
+      }
+    });
+  }
+
+  @Test
+  public void shouldExecuteTasks() throws InterruptedException {
+    executor.submit(new DummyTask("one", 0));
+    executor.submit(new DummyTask("two", 0));
+
+    executor.complete();
+
+    assertThat(results.contains("one"), is(true));
+    assertThat(results.contains("two"), is(true));
+    assertThat(timeouts.isEmpty(), is(true));
+  }
+
+  @Test
+  public void shouldReportTimedOutTasks() throws InterruptedException {
+    executor.submit(new DummyTask("successful", 0));
+    executor.submit(new DummyTask("timeout", timeout * 2));
+
+    executor.complete();
+
+    DummyTask task = Iterables.get(timeouts, 0);
+
+    assertThat(timeouts.size(), is(1));
+    assertThat(task.result, is("timeout"));
+  }
+
+  @Test
+  public void slowTasksShouldNotPreventOthersFromRunning() throws Exception {
+    // Clog up the threadpool with misbehaving tasks
+    executor.submit(new MisbehavingTask("slow task 1", timeout * 2));
+    executor.submit(new MisbehavingTask("slow task 2", timeout * 2));
+    executor.submit(new MisbehavingTask("slow task 3", timeout * 2));
+    executor.submit(new DummyTask("good task", 0L));
+
+    executor.complete();
+
+    // Some of the slow tasks may complete, but we want to ensure
+    // that the well behaving task completes.
+    assertThat(results.size() >= 1, is(true));
+
+    boolean foundGoodTask = false;
+    for (String t : results) {
+      if (t.equals("good task")) {
+        foundGoodTask = true;
+      }
+    }
+
+    assertThat(foundGoodTask, is(true));
+  }
+
+  @Test
+  public void shouldAllowMoreTasksAfterComplete() throws Exception {
+    executor.submit(new DummyTask("task1", 0L));
+    executor.complete();
+    assertThat(results.size(), is(1));
+    assertThat(Iterables.get(results, 0), is("task1"));
+
+    results.clear();
+
+    executor.submit(new DummyTask("task2", 0L));
+    executor.complete();
+    assertThat(results.size(), is(1));
+    assertThat(Iterables.get(results, 0), is("task2"));
+  }
+
+  @After
+  public void tearDown() {
+    executor.close();
+  }
+
+  /*
+   * Task that will sleep and then return the given result.
+   */
+  private static class DummyTask implements Callable<String> {
+    private final String result;
+    private final long timeout;
+
+    public DummyTask(String result, long timeout) {
+      this.result = result;
+      this.timeout = timeout;
+    }
+
+    @Override
+    public String call() throws Exception {
+      Thread.sleep(timeout);
+      return result;
+    }
+  }
+
+  /*
+   * Task that will misbehave by ignoring the first interrupt attempt and 
continue to sleep for one
+   * extra cycle.
+   */
+  private static class MisbehavingTask extends DummyTask {
+    public MisbehavingTask(String result, long timeout) {
+      super(result, timeout);
+    }
+
+    @Override
+    public String call() throws Exception {
+      try {
+        return super.call();
+      } catch (InterruptedException e) {
+        return super.call();
+      }
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to