This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 65250de5e5 QPID-8664: [Broker-J] Fixed failing TwoNodeTest (related to 4/10) (#285) 65250de5e5 is described below commit 65250de5e5eb3cdf44821591f46ffb52955f8f05 Author: Daniil Kirilyuk <daniel.kiril...@gmail.com> AuthorDate: Tue Apr 29 11:22:56 2025 +0200 QPID-8664: [Broker-J] Fixed failing TwoNodeTest (related to 4/10) (#285) --- .../configuration/updater/TaskExecutorImpl.java | 50 +++++++++++++++------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java index d8b2db0f94..4d2bda9536 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java @@ -95,6 +95,8 @@ public class TaskExecutorImpl implements TaskExecutor QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(runnable -> { _taskThread = new TaskThread(runnable, _name, TaskExecutorImpl.this); + _taskThread.setUncaughtExceptionHandler((thread, throwable) -> + LOGGER.error("Uncaught exception in task thread", throwable)); return _taskThread; }); _executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, factory); @@ -112,7 +114,13 @@ public class TaskExecutorImpl implements TaskExecutor { LOGGER.debug("Stopping task executor {} immediately", _name); final List<Runnable> cancelledTasks = executor.shutdownNow(); - cancelledTasks.forEach(runnable -> ((RunnableWrapper) runnable).cancel()); + cancelledTasks.forEach(runnable -> + { + if (runnable instanceof RunnableWrapper<?, ?> runnableWrapper) + { + runnableWrapper.cancel(); + } + }); _executor = null; _taskThread = null; LOGGER.debug("Task executor was stopped immediately. Number of unfinished tasks: {}", cancelledTasks.size()); @@ -157,17 +165,15 @@ public class TaskExecutorImpl implements TaskExecutor final T result = task.execute(); return CompletableFuture.completedFuture(result); } - else - { - if (LOGGER.isTraceEnabled()) - { - LOGGER.trace("Submitting {} to executor {}", task, _name); - } - final CompletableFuture<T> future = new CompletableFuture<>(); - _executor.execute(new RunnableWrapper<>(task, future)); - return future; + if (LOGGER.isTraceEnabled()) + { + LOGGER.trace("Submitting {} to executor {}", task, _name); } + + final CompletableFuture<T> future = new CompletableFuture<>(); + _executor.execute(new RunnableWrapper<>(task, future)); + return future; } @Override @@ -209,14 +215,15 @@ public class TaskExecutorImpl implements TaskExecutor return null; } - if (_principalAccessor == null || _principalAccessor.getPrincipal() == null - || contextSubject.getPrincipals().contains(_principalAccessor.getPrincipal())) + final Principal accessorPrincipal = _principalAccessor == null ? null : _principalAccessor.getPrincipal(); + + if (accessorPrincipal == null || contextSubject.getPrincipals().contains(accessorPrincipal)) { return contextSubject; } final Set<Principal> principals = new HashSet<>(contextSubject.getPrincipals()); - principals.add(_principalAccessor.getPrincipal()); + principals.add(accessorPrincipal); return SUBJECT_CACHE.get(principals, key -> createSubjectWithPrincipals(key, contextSubject)); } @@ -283,7 +290,7 @@ public class TaskExecutorImpl implements TaskExecutor } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Performing {}", _userTask); + LOGGER.debug("Performing {}", this); } final T result = Subject.doAs(_contextSubject, (PrivilegedAction<T>) () -> { @@ -304,7 +311,7 @@ public class TaskExecutorImpl implements TaskExecutor { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} failed to perform successfully", _userTask); + LOGGER.debug("{} failed to perform successfully", this); } if (throwable instanceof RuntimeException) { @@ -320,7 +327,7 @@ public class TaskExecutorImpl implements TaskExecutor } } - LOGGER.debug("{} performed successfully with result: {}", _userTask, result); + LOGGER.debug("{} performed successfully with result: {}", this, result); _future.complete(result); } @@ -328,6 +335,17 @@ public class TaskExecutorImpl implements TaskExecutor { _future.completeExceptionally(new CancellationException("Task was cancelled")); } + + @Override + public String toString() + { + final String arguments = _userTask.getArguments(); + if (arguments == null) + { + return "Task['%s' on '%s']".formatted(_userTask.getAction(), _userTask.getObject()); + } + return "Task['%s' on '%s' with arguments '%s']".formatted(_userTask.getAction(), _userTask.getObject(), arguments); + } } private class ImmediateIfSameThreadExecutor implements Executor --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org