kevinrr888 commented on code in PR #5817:
URL: https://github.com/apache/accumulo/pull/5817#discussion_r2298294247
##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java:
##########
@@ -105,19 +105,26 @@ public FateExecutor(Fate<T> fate, T environment,
Set<Fate.FateOperation> fateOps
*/
protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer>
poolConfigs,
long idleCheckIntervalMillis) {
- final var pool = transactionExecutor;
final int configured = poolConfigs.get(fateOps);
- ThreadPools.resizePool(pool, () -> configured, poolName);
- synchronized (runningTxRunners) {
- final int needed = configured - runningTxRunners.size();
- if (needed > 0) {
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
+ ThreadPools.resizePool(transactionExecutor, () -> configured, poolName);
+ final int needed = configured - runningTxRunners.size();
+ if (needed > 0) {
+ // If the pool grew, then ensure that there is a TransactionRunner for
each thread
+ log.trace("FateExecutor {} needs {} more TransactionRunners", fateOps,
needed);
+ synchronized (runningTxRunners) {
Review Comment:
I still think it would be best to keep the entire if-elseif-else locked
under runningTxRunners to prevent any potential concurrency issues. This code
will lock to get the runningTxRunners.size(), unlock, then lock again under the
assumption that runningTxRunners.size() is the same which it might not be
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -112,6 +112,8 @@ public UserFateStore(ClientContext context, String
tableName, ZooUtil.LockID loc
super(lockID, isLockHeld, maxDeferred, fateIdGenerator);
this.context = Objects.requireNonNull(context);
this.tableName = Objects.requireNonNull(tableName);
+
Preconditions.checkArgument(this.context.tableOperations().exists(tableName),
+ "user fate store table does not exist.");
Review Comment:
Could add the table here
##########
core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java:
##########
@@ -201,6 +209,12 @@ protected Set<Fate.FateOperation> getFateOps() {
return fateOps;
}
+ public void printInfo() {
+ synchronized (runningTxRunners) {
+ runningTxRunners.forEach(r -> r.printInfo());
+ }
+ }
+
Review Comment:
#5824 does a similar thing, but adds the info to the existing toString. I
think the toString additions are a bit cleaner
##########
test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java:
##########
@@ -39,11 +39,6 @@ public class FlakyFate<T> extends Fate<T> {
public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String>
toLogStrFunc,
AccumuloConfiguration conf) {
super(environment, store, false, toLogStrFunc, conf, new
ScheduledThreadPoolExecutor(2));
- }
-
- @Override
- protected void startFateExecutors(T environment, AccumuloConfiguration conf,
- Set<FateExecutor<T>> fateExecutors) {
for (var poolConfig : getPoolConfigurations(conf).entrySet()) {
fateExecutors.add(
new FlakyFateExecutor<>(this, environment, poolConfig.getKey(),
poolConfig.getValue()));
Review Comment:
Is any of this still needed? I think it can be removed if the
FatePoolWatcher runs for FlakyFate
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]