kevinrr888 commented on code in PR #5817:
URL: https://github.com/apache/accumulo/pull/5817#discussion_r2293704935
##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java:
##########
@@ -412,7 +418,9 @@ public void run() {
}
} finally {
log.trace("A TransactionRunner is exiting...");
- Preconditions.checkState(runningTxRunners.remove(this));
+ synchronized (runningTxRunners) {
+ Preconditions.checkState(runningTxRunners.remove(this));
+ }
Review Comment:
no need to sync here since `this.runningTxRunners =
Collections.synchronizedSet(new HashSet<>());`
##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java:
##########
@@ -105,15 +105,22 @@ public FateExecutor(Fate<T> fate, T environment,
Set<Fate.FateOperation> fateOps
protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer>
poolConfigs,
long idleCheckIntervalMillis) {
final var pool = transactionExecutor;
- final var runningTxRunners = getRunningTxRunners();
+ final var runningTxRunnersCopy = getRunningTxRunners();
final int configured = poolConfigs.get(fateOps);
ThreadPools.resizePool(pool, () -> configured, poolName);
- final int needed = configured - runningTxRunners.size();
+ final int needed = configured - runningTxRunnersCopy.size();
if (needed > 0) {
// If the pool grew, then ensure that there is a TransactionRunner for
each thread
for (int i = 0; i < needed; i++) {
try {
- pool.execute(new TransactionRunner());
+ final TransactionRunner tr = new TransactionRunner();
+ synchronized (runningTxRunners) {
+ if (pool.isShutdown() || pool.isTerminating()) {
+ return;
+ }
+ runningTxRunners.add(tr);
+ pool.execute(tr);
+ }
Review Comment:
I like this switch to add the runner before calling execute, this is safer.
Some comments:
1) Should we maybe just avoid the use of the copy and just lock the
runningTxRunners for the entire if-elseif-else block? Using the copy and the
real seems a bit sketchy, but I'm not sure if it's actually wrong/a problem.
The copy was sketching me out when I was working on #5813 so I removed it there
as well.
2) `pool.isShutdown() || pool.isTerminating()` --> `pool.isShutdown()` might
be clearer. I'm pretty sure `pool.isTerminating()` is true only if
`pool.isShutdown()` is true
##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java:
##########
@@ -105,15 +105,22 @@ public FateExecutor(Fate<T> fate, T environment,
Set<Fate.FateOperation> fateOps
protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer>
poolConfigs,
long idleCheckIntervalMillis) {
final var pool = transactionExecutor;
- final var runningTxRunners = getRunningTxRunners();
+ final var runningTxRunnersCopy = getRunningTxRunners();
final int configured = poolConfigs.get(fateOps);
ThreadPools.resizePool(pool, () -> configured, poolName);
- final int needed = configured - runningTxRunners.size();
+ final int needed = configured - runningTxRunnersCopy.size();
if (needed > 0) {
// If the pool grew, then ensure that there is a TransactionRunner for
each thread
for (int i = 0; i < needed; i++) {
try {
- pool.execute(new TransactionRunner());
+ final TransactionRunner tr = new TransactionRunner();
+ synchronized (runningTxRunners) {
+ if (pool.isShutdown() || pool.isTerminating()) {
+ return;
+ }
+ runningTxRunners.add(tr);
+ pool.execute(tr);
+ }
} catch (RejectedExecutionException e) {
// RejectedExecutionException could be shutting down
if (pool.isShutdown()) {
Review Comment:
I was thinking this case should probably be removed, but it might still be
possible if the pool is shutdown after the `if (pool.isShutdown() ||
pool.isTerminating())` check but before `pool.execute(tr);`... So I think we
still need it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]