This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 976e2a3014 QPID-8704: [Broker-J] Optimize TaskExecutorImpl subject
handling and execution (#304)
976e2a3014 is described below
commit 976e2a3014a3b110da04c31cfee7cc0ab26eeccc
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Thu Jul 31 12:00:46 2025 +0200
QPID-8704: [Broker-J] Optimize TaskExecutorImpl subject handling and
execution (#304)
* QPID-8704: [Broker-J] Optimize TaskExecutorImpl subject handling and
execution
* QPID-8704: [Broker-J] Restored method's private modifier
---------
Co-authored-by: vavrtom <[email protected]>
---
.../configuration/updater/TaskExecutorImpl.java | 166 +++++++++------------
.../updater/TaskExecutorWithPrincipalTest.java | 163 --------------------
2 files changed, 74 insertions(+), 255 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index 4d2bda9536..1d04d193ed 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -24,26 +24,21 @@ package org.apache.qpid.server.configuration.updater;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
-import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,17 +49,12 @@ public class TaskExecutorImpl implements TaskExecutor
{
private static final String TASK_EXECUTION_THREAD_NAME = "Broker-Config";
private static final Logger LOGGER =
LoggerFactory.getLogger(TaskExecutorImpl.class);
- private static final Cache<Set<Principal>, Subject> SUBJECT_CACHE =
Caffeine.newBuilder()
- .expireAfterAccess(Duration.ofMinutes(5))
- .maximumSize(1000)
- .build();
private final PrincipalAccessor _principalAccessor;
private final AtomicBoolean _running = new AtomicBoolean();
- private final ImmediateIfSameThreadExecutor _wrappedExecutor = new
ImmediateIfSameThreadExecutor();
private final String _name;
- private volatile Thread _taskThread;
+ private volatile TaskThread _taskThread;
private volatile ExecutorService _executor;
public TaskExecutorImpl()
@@ -89,18 +79,25 @@ public class TaskExecutorImpl implements TaskExecutor
{
if (_running.compareAndSet(false, true))
{
- LOGGER.debug("Starting task executor {}", _name);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Starting task executor {}", _name);
+ }
final BlockingQueue<Runnable> workQueue = new
LinkedBlockingQueue<>();
final ThreadFactory factory =
QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(runnable ->
{
- _taskThread = new TaskThread(runnable, _name,
TaskExecutorImpl.this);
- _taskThread.setUncaughtExceptionHandler((thread, throwable) ->
+ final TaskThread taskThread = new TaskThread(runnable, _name,
TaskExecutorImpl.this);
+ taskThread.setUncaughtExceptionHandler((thread, throwable) ->
LOGGER.error("Uncaught exception in task thread",
throwable));
- return _taskThread;
+ _taskThread = taskThread;
+ return taskThread;
});
_executor = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, workQueue, factory);
- LOGGER.debug("Task executor is started");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Task executor is started");
+ }
}
}
@@ -112,7 +109,10 @@ public class TaskExecutorImpl implements TaskExecutor
final ExecutorService executor = _executor;
if (executor != null)
{
- LOGGER.debug("Stopping task executor {} immediately", _name);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Stopping task executor {} immediately",
_name);
+ }
final List<Runnable> cancelledTasks = executor.shutdownNow();
cancelledTasks.forEach(runnable ->
{
@@ -123,9 +123,11 @@ public class TaskExecutorImpl implements TaskExecutor
});
_executor = null;
_taskThread = null;
- LOGGER.debug("Task executor was stopped immediately. Number of
unfinished tasks: {}", cancelledTasks.size());
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Task executor was stopped immediately.
Number of unfinished tasks: {}", cancelledTasks.size());
+ }
}
- SUBJECT_CACHE.invalidateAll();
}
}
@@ -137,13 +139,18 @@ public class TaskExecutorImpl implements TaskExecutor
final ExecutorService executor = _executor;
if (executor != null)
{
- LOGGER.debug("Stopping task executor {}", _name);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Stopping task executor {}", _name);
+ }
executor.shutdown();
_executor = null;
_taskThread = null;
- LOGGER.debug("Task executor is stopped");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Task executor is stopped");
+ }
}
- SUBJECT_CACHE.invalidateAll();
}
}
@@ -172,7 +179,7 @@ public class TaskExecutorImpl implements TaskExecutor
}
final CompletableFuture<T> future = new CompletableFuture<>();
- _executor.execute(new RunnableWrapper<>(task, future));
+ _executor.execute(new RunnableWrapper<>(task, future,
effectiveSubject()));
return future;
}
@@ -183,7 +190,15 @@ public class TaskExecutorImpl implements TaskExecutor
{
LOGGER.trace("Running runnable {} through executor interface",
command);
}
- _wrappedExecutor.execute(command);
+ if (isTaskExecutorThread() || (_executor == null &&
(Thread.currentThread() instanceof TaskThread &&
+ ((TaskThread)Thread.currentThread()).getTaskExecutor() ==
TaskExecutorImpl.this)))
+ {
+ command.run();
+ }
+ else
+ {
+ _executor.execute(new RunnableWrapper<>(command,
effectiveSubject()));
+ }
}
@Override
@@ -206,7 +221,7 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- private Subject getCachedSubject()
+ private Subject effectiveSubject()
{
final Subject contextSubject =
Subject.getSubject(AccessController.getContext());
@@ -225,65 +240,44 @@ public class TaskExecutorImpl implements TaskExecutor
final Set<Principal> principals = new
HashSet<>(contextSubject.getPrincipals());
principals.add(accessorPrincipal);
- return SUBJECT_CACHE.get(principals, key ->
createSubjectWithPrincipals(key, contextSubject));
+ return new Subject(contextSubject.isReadOnly(), principals,
contextSubject.getPublicCredentials(), contextSubject.getPrivateCredentials());
}
- Subject createSubjectWithPrincipals(final Set<Principal> principals,
Subject subject)
+ private static class RunnableWrapper<T, E extends Exception> implements
Runnable
{
- return new Subject(subject.isReadOnly(), principals,
subject.getPublicCredentials(), subject.getPrivateCredentials());
- }
+ private final Task<T, E> _userTask;
+ private final CompletableFuture<T> _future;
- private class ImmediateWrapper<T, E extends Exception> extends
RunnableWrapper<T, E>
- {
- final Runnable _runnable;
- final Subject _subject;
+ private Runnable _runnable;
+ private Throwable _throwable;
+
+ private final Subject _contextSubject;
- boolean _cancelled;
+ private RunnableWrapper(final Task<T, E> userWork,
+ final CompletableFuture<T> future,
+ final Subject subject)
+ {
+ _userTask = userWork;
+ _future = future;
+ _contextSubject = subject;
+ }
- ImmediateWrapper(final Runnable runnable, final Subject subject)
+ private RunnableWrapper(final Runnable runnable, final Subject
contextSubject)
{
- super(null, null);
_runnable = runnable;
- _subject = subject;
+ _contextSubject = contextSubject;
+ _userTask = null;
+ _future = null;
}
@Override
public void run()
{
- if (_cancelled)
+ if (_runnable != null)
{
+ _runnable.run();
return;
}
- Subject.doAs(_subject, (PrivilegedAction<Void>) () ->
- {
- _runnable.run();
- return null;
- });
- }
-
- void cancel()
- {
- _cancelled = true;
- }
- }
-
- private class RunnableWrapper<T, E extends Exception> implements Runnable
- {
- private final Task<T, E> _userTask;
- private final CompletableFuture<T> _future;
- private final Subject _contextSubject;
- private final AtomicReference<Throwable> _throwable;
-
- public RunnableWrapper(final Task<T, E> userWork, final
CompletableFuture<T> future)
- {
- _userTask = userWork;
- _future = future;
- _contextSubject = getCachedSubject();
- _throwable = new AtomicReference<>();
- }
-
- public void run()
- {
if (_future.isCancelled() || _future.isCompletedExceptionally())
{
return;
@@ -298,15 +292,15 @@ public class TaskExecutorImpl implements TaskExecutor
{
return _userTask.execute();
}
- catch (Throwable t)
+ catch (Throwable throwable)
{
- _throwable.set(t);
- _future.obtrudeException(t);
+ _throwable = throwable;
+ _future.obtrudeException(throwable);
}
return null;
});
- final Throwable throwable = _throwable.get();
+ final Throwable throwable = _throwable;
if (throwable != null)
{
if (LOGGER.isDebugEnabled())
@@ -326,14 +320,19 @@ public class TaskExecutorImpl implements TaskExecutor
throw new RuntimeException(throwable);
}
}
-
- LOGGER.debug("{} performed successfully with result: {}", this,
result);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("{} performed successfully with result: {}",
this, result);
+ }
_future.complete(result);
}
void cancel()
{
- _future.completeExceptionally(new CancellationException("Task was
cancelled"));
+ if (_future != null)
+ {
+ _future.completeExceptionally(new CancellationException("Task
was cancelled"));
+ }
}
@Override
@@ -348,23 +347,6 @@ public class TaskExecutorImpl implements TaskExecutor
}
}
- private class ImmediateIfSameThreadExecutor implements Executor
- {
- @Override
- public void execute(final Runnable command)
- {
- if (isTaskExecutorThread() || (_executor == null &&
(Thread.currentThread() instanceof TaskThread &&
- ((TaskThread)Thread.currentThread()).getTaskExecutor() ==
TaskExecutorImpl.this)))
- {
- command.run();
- }
- else
- {
- _executor.execute(new ImmediateWrapper<>(command,
getCachedSubject()));
- }
- }
- }
-
private static class TaskThread extends Thread
{
private final TaskExecutorImpl _taskExecutor;
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
b/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
deleted file mode 100644
index 036942fc5b..0000000000
---
a/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/TaskExecutorWithPrincipalTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.configuration.updater;
-
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.security.auth.Subject;
-
-import org.junit.jupiter.api.AfterEach;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.qpid.server.BrokerPrincipal;
-import org.apache.qpid.server.model.AuthenticationProvider;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
-import org.apache.qpid.test.utils.UnitTestBase;
-
-class TaskExecutorWithPrincipalTest extends UnitTestBase
-{
- final VirtualHost _virtualHost = mock(VirtualHost.class);
- final VirtualHostPrincipal _virtualHostPrincipal = new
VirtualHostPrincipal(_virtualHost);
-
- private TaskExecutorImpl _executor;
-
- @BeforeEach
- void setUp()
- {
- _executor = new TaskExecutorImpl("Broker-Config", () ->
_virtualHostPrincipal);
- _executor.start();
- }
-
- @AfterEach
- void tearDown()
- {
- _executor.stopImmediately();
- }
-
- @Test
- void emptySubject()
- {
- final Subject subject = new Subject();
- final AtomicReference<Subject> taskSubject = new AtomicReference<>();
-
- runTask(subject, taskSubject, _executor);
-
- assertEquals(Set.of(_virtualHostPrincipal),
taskSubject.get().getPrincipals(), "Unexpected security manager principal");
- }
-
- @Test
- void subjectWithBrokerPrincipal()
- {
- final Broker<?> broker = mock(Broker.class);
- final BrokerPrincipal brokerPrincipal = new BrokerPrincipal(broker);
- final Subject subject = new Subject(true, Set.of(brokerPrincipal),
Set.of(), Set.of());
- final AtomicReference<Subject> taskSubject = new AtomicReference<>();
-
- runTask(subject, taskSubject, _executor);
-
- assertEquals(2, taskSubject.get().getPrincipals().size(), "Unexpected
principals count");
-
assertTrue(taskSubject.get().getPrincipals().contains(brokerPrincipal),
"Expected to have broker principal");
-
assertTrue(taskSubject.get().getPrincipals().contains(_virtualHostPrincipal),
"Expected to have virtualhost principal");
- }
-
- @Test
- void cachedSubjects()
- {
- final TaskExecutorImpl spy1 = spy(_executor);
- final TaskExecutorImpl spy2 = spy(_executor);
- final AuthenticationProvider authProvider =
mock(AuthenticationProvider.class);
- when(authProvider.getName()).thenReturn("authProvider");
- when(authProvider.getType()).thenReturn("mock");
- final UsernamePrincipal usernamePrincipal1 = new
UsernamePrincipal("user1", authProvider);
- final UsernamePrincipal usernamePrincipal2 = new
UsernamePrincipal("user2", authProvider);
- final Subject subject1 = new Subject(true, Set.of(usernamePrincipal1),
Set.of(), Set.of());
- final Subject subject2 = new Subject(true, Set.of(usernamePrincipal2),
Set.of(), Set.of());
- final AtomicReference<Subject> taskSubject = new AtomicReference<>();
-
- // subject1 should be created
- runTask(subject1, taskSubject, spy1);
- verify(spy1, times(1)).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
-
- // repeated call should retrieve subject1 from cache
- runTask(subject1, taskSubject, spy2);
- verify(spy2, never()).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
-
- // subject2 should be created
- runTask(subject2, taskSubject, spy1);
- verify(spy1, times(2)).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
-
- // repeated call should retrieve subject2 from cache
- runTask(subject2, taskSubject, spy2);
- verify(spy2, never()).createSubjectWithPrincipals(any(Set.class),
any(Subject.class));
- }
-
- private void runTask(final Subject subject, final AtomicReference<Subject>
taskSubject, final TaskExecutorImpl executor)
- {
- Subject.doAs(subject, (PrivilegedAction<Object>) () ->
- {
- executor.run(new Task<Void, RuntimeException>()
- {
- @Override
- public Void execute()
- {
-
taskSubject.set(Subject.getSubject(AccessController.getContext()));
- return null;
- }
-
- @Override
- public String getObject()
- {
- return getTestName();
- }
-
- @Override
- public String getAction()
- {
- return "test";
- }
-
- @Override
- public String getArguments()
- {
- return null;
- }
- });
- return null;
- });
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]