This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 976e2a3014 QPID-8704: [Broker-J] Optimize TaskExecutorImpl subject 
handling and execution (#304)
976e2a3014 is described below

commit 976e2a3014a3b110da04c31cfee7cc0ab26eeccc
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Thu Jul 31 12:00:46 2025 +0200

    QPID-8704: [Broker-J] Optimize TaskExecutorImpl subject handling and 
execution (#304)
    
    * QPID-8704: [Broker-J] Optimize TaskExecutorImpl subject handling and 
execution
    
    * QPID-8704: [Broker-J] Restored method's private modifier
    
    ---------
    
    Co-authored-by: vavrtom <[email protected]>
---
 .../configuration/updater/TaskExecutorImpl.java    | 166 +++++++++------------
 .../updater/TaskExecutorWithPrincipalTest.java     | 163 --------------------
 2 files changed, 74 insertions(+), 255 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index 4d2bda9536..1d04d193ed 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -24,26 +24,21 @@ package org.apache.qpid.server.configuration.updater;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
-import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,17 +49,12 @@ public class TaskExecutorImpl implements TaskExecutor
 {
     private static final String TASK_EXECUTION_THREAD_NAME = "Broker-Config";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskExecutorImpl.class);
-    private static final Cache<Set<Principal>, Subject> SUBJECT_CACHE = 
Caffeine.newBuilder()
-            .expireAfterAccess(Duration.ofMinutes(5))
-            .maximumSize(1000)
-            .build();
 
     private final PrincipalAccessor _principalAccessor;
     private final AtomicBoolean _running = new AtomicBoolean();
-    private final ImmediateIfSameThreadExecutor _wrappedExecutor = new 
ImmediateIfSameThreadExecutor();
     private final String _name;
 
-    private volatile Thread _taskThread;
+    private volatile TaskThread _taskThread;
     private volatile ExecutorService _executor;
 
     public TaskExecutorImpl()
@@ -89,18 +79,25 @@ public class TaskExecutorImpl implements TaskExecutor
     {
         if (_running.compareAndSet(false, true))
         {
-            LOGGER.debug("Starting task executor {}", _name);
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Starting task executor {}", _name);
+            }
             final BlockingQueue<Runnable> workQueue = new 
LinkedBlockingQueue<>();
             final ThreadFactory factory =
                     
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(runnable ->
             {
-                _taskThread = new TaskThread(runnable, _name, 
TaskExecutorImpl.this);
-                _taskThread.setUncaughtExceptionHandler((thread, throwable) ->
+                final TaskThread taskThread = new TaskThread(runnable, _name, 
TaskExecutorImpl.this);
+                taskThread.setUncaughtExceptionHandler((thread, throwable) ->
                         LOGGER.error("Uncaught exception in task thread", 
throwable));
-                return _taskThread;
+                _taskThread = taskThread;
+                return taskThread;
             });
             _executor = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, workQueue, factory);
-            LOGGER.debug("Task executor is started");
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("Task executor is started");
+            }
         }
     }
 
@@ -112,7 +109,10 @@ public class TaskExecutorImpl implements TaskExecutor
             final ExecutorService executor = _executor;
             if (executor != null)
             {
-                LOGGER.debug("Stopping task executor {} immediately", _name);
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Stopping task executor {} immediately", 
_name);
+                }
                 final List<Runnable> cancelledTasks = executor.shutdownNow();
                 cancelledTasks.forEach(runnable ->
                 {
@@ -123,9 +123,11 @@ public class TaskExecutorImpl implements TaskExecutor
                 });
                 _executor = null;
                 _taskThread = null;
-                LOGGER.debug("Task executor was stopped immediately. Number of 
unfinished tasks: {}", cancelledTasks.size());
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Task executor was stopped immediately. 
Number of unfinished tasks: {}", cancelledTasks.size());
+                }
             }
-            SUBJECT_CACHE.invalidateAll();
         }
     }
 
@@ -137,13 +139,18 @@ public class TaskExecutorImpl implements TaskExecutor
             final ExecutorService executor = _executor;
             if (executor != null)
             {
-                LOGGER.debug("Stopping task executor {}", _name);
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Stopping task executor {}", _name);
+                }
                 executor.shutdown();
                 _executor = null;
                 _taskThread = null;
-                LOGGER.debug("Task executor is stopped");
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Task executor is stopped");
+                }
             }
-            SUBJECT_CACHE.invalidateAll();
         }
     }
 
@@ -172,7 +179,7 @@ public class TaskExecutorImpl implements TaskExecutor
         }
 
         final CompletableFuture<T> future = new CompletableFuture<>();
-        _executor.execute(new RunnableWrapper<>(task, future));
+        _executor.execute(new RunnableWrapper<>(task, future, 
effectiveSubject()));
         return future;
     }
 
@@ -183,7 +190,15 @@ public class TaskExecutorImpl implements TaskExecutor
         {
             LOGGER.trace("Running runnable {} through executor interface", 
command);
         }
-        _wrappedExecutor.execute(command);
+        if (isTaskExecutorThread() || (_executor == null && 
(Thread.currentThread() instanceof TaskThread &&
+                ((TaskThread)Thread.currentThread()).getTaskExecutor() == 
TaskExecutorImpl.this)))
+        {
+            command.run();
+        }
+        else
+        {
+            _executor.execute(new RunnableWrapper<>(command, 
effectiveSubject()));
+        }
     }
 
     @Override
@@ -206,7 +221,7 @@ public class TaskExecutorImpl implements TaskExecutor
         }
     }
 
-    private Subject getCachedSubject()
+    private Subject effectiveSubject()
     {
         final Subject contextSubject = 
Subject.getSubject(AccessController.getContext());
 
@@ -225,65 +240,44 @@ public class TaskExecutorImpl implements TaskExecutor
         final Set<Principal> principals = new 
HashSet<>(contextSubject.getPrincipals());
         principals.add(accessorPrincipal);
 
-        return SUBJECT_CACHE.get(principals, key -> 
createSubjectWithPrincipals(key, contextSubject));
+        return new Subject(contextSubject.isReadOnly(), principals, 
contextSubject.getPublicCredentials(), contextSubject.getPrivateCredentials());
     }
 
-    Subject createSubjectWithPrincipals(final Set<Principal> principals, 
Subject subject)
+    private static class RunnableWrapper<T, E extends Exception> implements 
Runnable
     {
-        return new Subject(subject.isReadOnly(), principals, 
subject.getPublicCredentials(), subject.getPrivateCredentials());
-    }
+        private final Task<T, E> _userTask;
+        private final CompletableFuture<T> _future;
 
-    private class ImmediateWrapper<T, E extends Exception> extends 
RunnableWrapper<T, E>
-    {
-        final Runnable _runnable;
-        final Subject _subject;
+        private Runnable _runnable;
+        private Throwable _throwable;
+
+        private final Subject _contextSubject;
 
-        boolean _cancelled;
+        private RunnableWrapper(final Task<T, E> userWork,
+                                final CompletableFuture<T> future,
+                                final Subject subject)
+        {
+            _userTask = userWork;
+            _future = future;
+            _contextSubject = subject;
+        }
 
-        ImmediateWrapper(final Runnable runnable, final Subject subject)
+        private RunnableWrapper(final Runnable runnable, final Subject 
contextSubject)
         {
-            super(null, null);
             _runnable = runnable;
-            _subject = subject;
+            _contextSubject = contextSubject;
+            _userTask = null;
+            _future = null;
         }
 
         @Override
         public void run()
         {
-            if (_cancelled)
+            if (_runnable != null)
             {
+                _runnable.run();
                 return;
             }
-            Subject.doAs(_subject, (PrivilegedAction<Void>) () ->
-            {
-                _runnable.run();
-                return null;
-            });
-        }
-
-        void cancel()
-        {
-            _cancelled = true;
-        }
-    }
-
-    private class RunnableWrapper<T, E extends Exception> implements Runnable
-    {
-        private final Task<T, E> _userTask;
-        private final CompletableFuture<T> _future;
-        private final Subject _contextSubject;
-        private final AtomicReference<Throwable> _throwable;
-
-        public RunnableWrapper(final Task<T, E> userWork, final 
CompletableFuture<T> future)
-        {
-            _userTask = userWork;
-            _future = future;
-            _contextSubject = getCachedSubject();
-            _throwable = new AtomicReference<>();
-        }
-
-        public void run()
-        {
             if (_future.isCancelled() || _future.isCompletedExceptionally())
             {
                 return;
@@ -298,15 +292,15 @@ public class TaskExecutorImpl implements TaskExecutor
                 {
                     return _userTask.execute();
                 }
-                catch (Throwable t)
+                catch (Throwable throwable)
                 {
-                    _throwable.set(t);
-                    _future.obtrudeException(t);
+                    _throwable = throwable;
+                    _future.obtrudeException(throwable);
                 }
                 return null;
             });
 
-            final Throwable throwable = _throwable.get();
+            final Throwable throwable = _throwable;
             if (throwable != null)
             {
                 if (LOGGER.isDebugEnabled())
@@ -326,14 +320,19 @@ public class TaskExecutorImpl implements TaskExecutor
                     throw new RuntimeException(throwable);
                 }
             }
-
-            LOGGER.debug("{} performed successfully with result: {}", this, 
result);
+            if (LOGGER.isDebugEnabled())
+            {
+                LOGGER.debug("{} performed successfully with result: {}", 
this, result);
+            }
             _future.complete(result);
         }
 
         void cancel()
         {
-            _future.completeExceptionally(new CancellationException("Task was 
cancelled"));
+            if (_future != null)
+            {
+                _future.completeExceptionally(new CancellationException("Task 
was cancelled"));
+            }
         }
 
         @Override
@@ -348,23 +347,6 @@ public class TaskExecutorImpl implements TaskExecutor
         }
     }
 
-    private class ImmediateIfSameThreadExecutor implements Executor
-    {
-        @Override
-        public void execute(final Runnable command)
-        {
-            if (isTaskExecutorThread() || (_executor == null && 
(Thread.currentThread() instanceof TaskThread &&
-                    ((TaskThread)Thread.currentThread()).getTaskExecutor() == 
TaskExecutorImpl.this)))
-            {
-                command.run();
-            }
-            else
-            {
-                _executor.execute(new ImmediateWrapper<>(command, 
getCachedSubject()));
-            }
-        }
-    }
-
     private static class TaskThread extends Thread
     {
         private final TaskExecutorImpl _taskExecutor;
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
deleted file mode 100644
index 036942fc5b..0000000000
--- 
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.configuration.updater;
-
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.security.auth.Subject;
-
-import org.junit.jupiter.api.AfterEach;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.qpid.server.BrokerPrincipal;
-import org.apache.qpid.server.model.AuthenticationProvider;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
-import org.apache.qpid.test.utils.UnitTestBase;
-
-class TaskExecutorWithPrincipalTest extends UnitTestBase
-{
-    final VirtualHost _virtualHost = mock(VirtualHost.class);
-    final VirtualHostPrincipal _virtualHostPrincipal = new 
VirtualHostPrincipal(_virtualHost);
-
-    private TaskExecutorImpl _executor;
-
-    @BeforeEach
-    void setUp()
-    {
-        _executor = new TaskExecutorImpl("Broker-Config", () -> 
_virtualHostPrincipal);
-        _executor.start();
-    }
-
-    @AfterEach
-    void tearDown()
-    {
-        _executor.stopImmediately();
-    }
-
-    @Test
-    void emptySubject()
-    {
-        final Subject subject = new Subject();
-        final AtomicReference<Subject> taskSubject = new AtomicReference<>();
-
-        runTask(subject, taskSubject, _executor);
-
-        assertEquals(Set.of(_virtualHostPrincipal), 
taskSubject.get().getPrincipals(), "Unexpected security manager principal");
-    }
-
-    @Test
-    void subjectWithBrokerPrincipal()
-    {
-        final Broker<?> broker = mock(Broker.class);
-        final BrokerPrincipal brokerPrincipal = new BrokerPrincipal(broker);
-        final Subject subject = new Subject(true, Set.of(brokerPrincipal), 
Set.of(), Set.of());
-        final AtomicReference<Subject> taskSubject = new AtomicReference<>();
-
-        runTask(subject, taskSubject, _executor);
-
-        assertEquals(2, taskSubject.get().getPrincipals().size(), "Unexpected 
principals count");
-        
assertTrue(taskSubject.get().getPrincipals().contains(brokerPrincipal), 
"Expected to have broker principal");
-        
assertTrue(taskSubject.get().getPrincipals().contains(_virtualHostPrincipal), 
"Expected to have virtualhost principal");
-    }
-
-    @Test
-    void cachedSubjects()
-    {
-        final TaskExecutorImpl spy1 = spy(_executor);
-        final TaskExecutorImpl spy2 = spy(_executor);
-        final AuthenticationProvider authProvider = 
mock(AuthenticationProvider.class);
-        when(authProvider.getName()).thenReturn("authProvider");
-        when(authProvider.getType()).thenReturn("mock");
-        final UsernamePrincipal usernamePrincipal1 = new 
UsernamePrincipal("user1", authProvider);
-        final UsernamePrincipal usernamePrincipal2 = new 
UsernamePrincipal("user2", authProvider);
-        final Subject subject1 = new Subject(true, Set.of(usernamePrincipal1), 
Set.of(), Set.of());
-        final Subject subject2 = new Subject(true, Set.of(usernamePrincipal2), 
Set.of(), Set.of());
-        final AtomicReference<Subject> taskSubject = new AtomicReference<>();
-
-        // subject1 should be created
-        runTask(subject1, taskSubject, spy1);
-        verify(spy1, times(1)).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
-
-        // repeated call should retrieve subject1 from cache
-        runTask(subject1, taskSubject, spy2);
-        verify(spy2, never()).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
-
-        // subject2 should be created
-        runTask(subject2, taskSubject, spy1);
-        verify(spy1, times(2)).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
-
-        // repeated call should retrieve subject2 from cache
-        runTask(subject2, taskSubject, spy2);
-        verify(spy2, never()).createSubjectWithPrincipals(any(Set.class), 
any(Subject.class));
-    }
-
-    private void runTask(final Subject subject, final AtomicReference<Subject> 
taskSubject, final TaskExecutorImpl executor)
-    {
-        Subject.doAs(subject, (PrivilegedAction<Object>) () ->
-        {
-            executor.run(new Task<Void, RuntimeException>()
-            {
-                @Override
-                public Void execute()
-                {
-                    
taskSubject.set(Subject.getSubject(AccessController.getContext()));
-                    return null;
-                }
-
-                @Override
-                public String getObject()
-                {
-                    return getTestName();
-                }
-
-                @Override
-                public String getAction()
-                {
-                    return "test";
-                }
-
-                @Override
-                public String getArguments()
-                {
-                    return null;
-                }
-            });
-            return null;
-        });
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to