This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 65250de5e5 QPID-8664: [Broker-J] Fixed failing TwoNodeTest (related to
4/10) (#285)
65250de5e5 is described below
commit 65250de5e5eb3cdf44821591f46ffb52955f8f05
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Tue Apr 29 11:22:56 2025 +0200
QPID-8664: [Broker-J] Fixed failing TwoNodeTest (related to 4/10) (#285)
---
.../configuration/updater/TaskExecutorImpl.java | 50 +++++++++++++++-------
1 file changed, 34 insertions(+), 16 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index d8b2db0f94..4d2bda9536 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -95,6 +95,8 @@ public class TaskExecutorImpl implements TaskExecutor
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(runnable ->
{
_taskThread = new TaskThread(runnable, _name,
TaskExecutorImpl.this);
+ _taskThread.setUncaughtExceptionHandler((thread, throwable) ->
+ LOGGER.error("Uncaught exception in task thread",
throwable));
return _taskThread;
});
_executor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, workQueue, factory);
@@ -112,7 +114,13 @@ public class TaskExecutorImpl implements TaskExecutor
{
LOGGER.debug("Stopping task executor {} immediately", _name);
final List<Runnable> cancelledTasks = executor.shutdownNow();
- cancelledTasks.forEach(runnable -> ((RunnableWrapper)
runnable).cancel());
+ cancelledTasks.forEach(runnable ->
+ {
+ if (runnable instanceof RunnableWrapper<?, ?>
runnableWrapper)
+ {
+ runnableWrapper.cancel();
+ }
+ });
_executor = null;
_taskThread = null;
LOGGER.debug("Task executor was stopped immediately. Number of
unfinished tasks: {}", cancelledTasks.size());
@@ -157,17 +165,15 @@ public class TaskExecutorImpl implements TaskExecutor
final T result = task.execute();
return CompletableFuture.completedFuture(result);
}
- else
- {
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("Submitting {} to executor {}", task, _name);
- }
- final CompletableFuture<T> future = new CompletableFuture<>();
- _executor.execute(new RunnableWrapper<>(task, future));
- return future;
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("Submitting {} to executor {}", task, _name);
}
+
+ final CompletableFuture<T> future = new CompletableFuture<>();
+ _executor.execute(new RunnableWrapper<>(task, future));
+ return future;
}
@Override
@@ -209,14 +215,15 @@ public class TaskExecutorImpl implements TaskExecutor
return null;
}
- if (_principalAccessor == null || _principalAccessor.getPrincipal() ==
null
- ||
contextSubject.getPrincipals().contains(_principalAccessor.getPrincipal()))
+ final Principal accessorPrincipal = _principalAccessor == null ? null
: _principalAccessor.getPrincipal();
+
+ if (accessorPrincipal == null ||
contextSubject.getPrincipals().contains(accessorPrincipal))
{
return contextSubject;
}
final Set<Principal> principals = new
HashSet<>(contextSubject.getPrincipals());
- principals.add(_principalAccessor.getPrincipal());
+ principals.add(accessorPrincipal);
return SUBJECT_CACHE.get(principals, key ->
createSubjectWithPrincipals(key, contextSubject));
}
@@ -283,7 +290,7 @@ public class TaskExecutorImpl implements TaskExecutor
}
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Performing {}", _userTask);
+ LOGGER.debug("Performing {}", this);
}
final T result = Subject.doAs(_contextSubject,
(PrivilegedAction<T>) () ->
{
@@ -304,7 +311,7 @@ public class TaskExecutorImpl implements TaskExecutor
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("{} failed to perform successfully",
_userTask);
+ LOGGER.debug("{} failed to perform successfully", this);
}
if (throwable instanceof RuntimeException)
{
@@ -320,7 +327,7 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- LOGGER.debug("{} performed successfully with result: {}",
_userTask, result);
+ LOGGER.debug("{} performed successfully with result: {}", this,
result);
_future.complete(result);
}
@@ -328,6 +335,17 @@ public class TaskExecutorImpl implements TaskExecutor
{
_future.completeExceptionally(new CancellationException("Task was
cancelled"));
}
+
+ @Override
+ public String toString()
+ {
+ final String arguments = _userTask.getArguments();
+ if (arguments == null)
+ {
+ return "Task['%s' on '%s']".formatted(_userTask.getAction(),
_userTask.getObject());
+ }
+ return "Task['%s' on '%s' with arguments
'%s']".formatted(_userTask.getAction(), _userTask.getObject(), arguments);
+ }
}
private class ImmediateIfSameThreadExecutor implements Executor
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]