keith-turner commented on code in PR #6021:
URL: https://github.com/apache/accumulo/pull/6021#discussion_r2625035401
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -99,10 +99,25 @@ public void run() {
store.setStatus(tid, IN_PROGRESS);
}
op = executeCall(tid, op);
+ // It's possible that a Fate operation impl
+ // may not do the right thing with an
+ // InterruptedException.
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Fate Transaction Runner
thread interrupted");
+ }
} else {
continue;
}
-
+ } catch (InterruptedException e) {
Review Comment:
Could move this check down into the `catch(Exception e){` block and there
can see if the exception or its cause is an interrupted exception. Like [this
function](https://github.com/apache/accumulo/blob/960bd08820d2f1477ba7241a04dd576fbd78fb20/core/src/main/java/org/apache/accumulo/core/util/ShutdownUtil.java#L39)
does does for IOException. Sometimes code will wrap an Interrupted exception
w/ some kind of runtime exception.
Also, I do not know if when this catch block rethrows the exception if it
will go to the next catch block, do not think it will but not sure. If not
then it will not transition to failed. That could be another reason to move
the check into the `catch (Exception e)` block. When it is in that block it
can decide if tansition to failed should be called or not.
##########
test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java:
##########
@@ -448,6 +448,62 @@ public void testRepoFails() throws Exception {
assertTrue(fate.getException(txid).getMessage().contains("isReady()
failed"));
}
+ @Test
+ public void testShutdownDoesNotFailTx() throws Exception {
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
+
+ callStarted = new CountDownLatch(1);
+ finishCall = new CountDownLatch(1);
+
+ long txid = fate.startTransaction();
+ assertEquals(TStatus.NEW, getTxStatus(zk, txid));
+ fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
+ assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
+
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
Review Comment:
Is this need w/ the call to await() below?
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -129,8 +144,28 @@ public void run() {
}
}
}
+ } catch (InterruptedException e) {
+ if (keepRunning.get()) {
+ runnerLog.error("Uncaught InterruptedException in FATE runner
thread.", e);
+ } else {
+ // If we are shutting down then Fate.shutdown was called
+ // and ExecutorService.shutdownNow was called resulting
+ // in this exception. We will exit at the top of the loop,
+ // so continue this loop iteration normally.
+ Thread.interrupted();
+ }
} catch (Exception e) {
- runnerLog.error("Uncaught exception in FATE runner thread.", e);
+ // ZooStore wraps InterruptedException's with a RuntimeException
+ if (!keepRunning.get() && e instanceof RuntimeException &&
e.getCause() != null
Review Comment:
If we had a method like `isInterruptedException(Exception e)` like mentioned
above then could potentially simplify and collapse these two catch blocks.
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -421,6 +458,7 @@ public Exception getException(long tid) {
* Flags that FATE threadpool to clear out and end. Does not actively stop
running FATE processes.
*/
public void shutdown(boolean wait) {
+ log.info("Shutdown called on Fate");
Review Comment:
Could log the wait argument.
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -99,10 +99,25 @@ public void run() {
store.setStatus(tid, IN_PROGRESS);
}
op = executeCall(tid, op);
+ // It's possible that a Fate operation impl
+ // may not do the right thing with an
+ // InterruptedException.
+ if (Thread.currentThread().isInterrupted()) {
Review Comment:
It does not hurt to look for this, but we may not need to. As long as we do
not call transition to failed when the thread pool is shutdown, then its ok.
Normally the process will die soon, so it does not matter if the threads do not
die as soon as they could.
##########
test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java:
##########
@@ -448,6 +448,62 @@ public void testRepoFails() throws Exception {
assertTrue(fate.getException(txid).getMessage().contains("isReady()
failed"));
}
+ @Test
+ public void testShutdownDoesNotFailTx() throws Exception {
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
+
+ callStarted = new CountDownLatch(1);
+ finishCall = new CountDownLatch(1);
+
+ long txid = fate.startTransaction();
+ assertEquals(TStatus.NEW, getTxStatus(zk, txid));
+ fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
+ assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
+
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
+
+ // wait for call() to be called
+ callStarted.await();
+ assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
+
+ // shutdown fate
+ fate.shutdown(true);
+
+ // tell the op to exit the method
+ finishCall.countDown();
Review Comment:
may not need to make this call because the thread was interrupted
Could change the inCall() method to set a static boolean sawInterrupt to
true when an InterruptedException happens. Then this method could wait for
that to be true here.
##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -421,6 +458,7 @@ public Exception getException(long tid) {
* Flags that FATE threadpool to clear out and end. Does not actively stop
running FATE processes.
*/
public void shutdown(boolean wait) {
+ log.info("Shutdown called on Fate");
keepRunning.set(false);
Review Comment:
```suggestion
// important this is set before shutdownNow is called as the background
threads will check this to see if shutdown related errors should be ignored.
keepRunning.set(false);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]