This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 65250de5e5 QPID-8664: [Broker-J] Fixed failing TwoNodeTest (related to 
4/10) (#285)
65250de5e5 is described below

commit 65250de5e5eb3cdf44821591f46ffb52955f8f05
Author: Daniil Kirilyuk <daniel.kiril...@gmail.com>
AuthorDate: Tue Apr 29 11:22:56 2025 +0200

    QPID-8664: [Broker-J] Fixed failing TwoNodeTest (related to 4/10) (#285)
---
 .../configuration/updater/TaskExecutorImpl.java    | 50 +++++++++++++++-------
 1 file changed, 34 insertions(+), 16 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index d8b2db0f94..4d2bda9536 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -95,6 +95,8 @@ public class TaskExecutorImpl implements TaskExecutor
                     
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(runnable ->
             {
                 _taskThread = new TaskThread(runnable, _name, 
TaskExecutorImpl.this);
+                _taskThread.setUncaughtExceptionHandler((thread, throwable) ->
+                        LOGGER.error("Uncaught exception in task thread", 
throwable));
                 return _taskThread;
             });
             _executor = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, workQueue, factory);
@@ -112,7 +114,13 @@ public class TaskExecutorImpl implements TaskExecutor
             {
                 LOGGER.debug("Stopping task executor {} immediately", _name);
                 final List<Runnable> cancelledTasks = executor.shutdownNow();
-                cancelledTasks.forEach(runnable -> ((RunnableWrapper) 
runnable).cancel());
+                cancelledTasks.forEach(runnable ->
+                {
+                    if (runnable instanceof RunnableWrapper<?, ?> 
runnableWrapper)
+                    {
+                        runnableWrapper.cancel();
+                    }
+                });
                 _executor = null;
                 _taskThread = null;
                 LOGGER.debug("Task executor was stopped immediately. Number of 
unfinished tasks: {}", cancelledTasks.size());
@@ -157,17 +165,15 @@ public class TaskExecutorImpl implements TaskExecutor
             final T result = task.execute();
             return CompletableFuture.completedFuture(result);
         }
-        else
-        {
-            if (LOGGER.isTraceEnabled())
-            {
-                LOGGER.trace("Submitting {} to executor {}", task, _name);
-            }
 
-            final CompletableFuture<T> future = new CompletableFuture<>();
-            _executor.execute(new RunnableWrapper<>(task, future));
-            return future;
+        if (LOGGER.isTraceEnabled())
+        {
+            LOGGER.trace("Submitting {} to executor {}", task, _name);
         }
+
+        final CompletableFuture<T> future = new CompletableFuture<>();
+        _executor.execute(new RunnableWrapper<>(task, future));
+        return future;
     }
 
     @Override
@@ -209,14 +215,15 @@ public class TaskExecutorImpl implements TaskExecutor
             return null;
         }
 
-        if (_principalAccessor == null || _principalAccessor.getPrincipal() == 
null
-                || 
contextSubject.getPrincipals().contains(_principalAccessor.getPrincipal()))
+        final Principal accessorPrincipal = _principalAccessor == null ? null 
: _principalAccessor.getPrincipal();
+
+        if (accessorPrincipal == null || 
contextSubject.getPrincipals().contains(accessorPrincipal))
         {
             return contextSubject;
         }
 
         final Set<Principal> principals = new 
HashSet<>(contextSubject.getPrincipals());
-        principals.add(_principalAccessor.getPrincipal());
+        principals.add(accessorPrincipal);
 
         return SUBJECT_CACHE.get(principals, key -> 
createSubjectWithPrincipals(key, contextSubject));
     }
@@ -283,7 +290,7 @@ public class TaskExecutorImpl implements TaskExecutor
             }
             if (LOGGER.isDebugEnabled())
             {
-                LOGGER.debug("Performing {}", _userTask);
+                LOGGER.debug("Performing {}", this);
             }
             final T result = Subject.doAs(_contextSubject, 
(PrivilegedAction<T>) () ->
             {
@@ -304,7 +311,7 @@ public class TaskExecutorImpl implements TaskExecutor
             {
                 if (LOGGER.isDebugEnabled())
                 {
-                    LOGGER.debug("{} failed to perform successfully", 
_userTask);
+                    LOGGER.debug("{} failed to perform successfully", this);
                 }
                 if (throwable instanceof RuntimeException)
                 {
@@ -320,7 +327,7 @@ public class TaskExecutorImpl implements TaskExecutor
                 }
             }
 
-            LOGGER.debug("{} performed successfully with result: {}", 
_userTask, result);
+            LOGGER.debug("{} performed successfully with result: {}", this, 
result);
             _future.complete(result);
         }
 
@@ -328,6 +335,17 @@ public class TaskExecutorImpl implements TaskExecutor
         {
             _future.completeExceptionally(new CancellationException("Task was 
cancelled"));
         }
+
+        @Override
+        public String toString()
+        {
+            final String arguments = _userTask.getArguments();
+            if (arguments == null)
+            {
+                return "Task['%s' on '%s']".formatted(_userTask.getAction(), 
_userTask.getObject());
+            }
+            return "Task['%s' on '%s' with arguments 
'%s']".formatted(_userTask.getAction(), _userTask.getObject(), arguments);
+        }
     }
 
     private class ImmediateIfSameThreadExecutor implements Executor


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to