[ 
https://issues.apache.org/jira/browse/QPIDJMS-552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439475#comment-17439475
 ] 

ASF GitHub Bot commented on QPIDJMS-552:
----------------------------------------

gemmellr commented on a change in pull request #44:
URL: https://github.com/apache/qpid-jms/pull/44#discussion_r740906037



##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
##########
@@ -151,6 +155,7 @@ public void run() {
 
         this.connectionInfo = connectionInfo;
         this.connectionInfo.setConnection(this);
+        this.completionExecutorService = 
this.connectionInfo.getCompletionExecutorServiceFactory().map(Supplier::get).orElse(null);

Review comment:
       Calling the field e.g 'sharedCompletionExecutorHolder' would make its 
use clearer.
   
   Similarly the getCompletionExecutorServiceFactory feels a bit lengthy, and 
not reflective that it only covers the shared one, e.g 
getSharedCompletionExecutorFactory.
   
   Actually, it feels like most of this line could be done up front, either in 
the ConnectionFactory or inside the ConnectionInfo, such that this line was a 
simple getSharedCompletionExecutorHolder that either returns null (as it will 
by default) or not.

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       I'd go with SharedCompletionThreads to emphasize its shared, given its 
both same and cross-connection sharing.
   
   EDIT: actually, is it? I'm not seeing where it would actually share? A new 
SharedDisposable looks to be made afresh each time, and has no statics to share 
across anything, which would seem to mean its just a bigger per-connection pool.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a 
continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final 
boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       I dont think the session closed check thing works. The closed boolean is 
set true as soon as session shutdown starts, but its possible for successfull 
async completions to arrive before the closure completes, simply because they 
are async and so is the session close, so they may be in flight and arrive 
before the session closure. I expect this change would mean they get could be 
marked as failed when they shouldnt, as the completion task would get dropped 
instead of running and then the fallback 'session closed, fail all outstanding 
completions' task would run and say they failed.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a 
continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);

Review comment:
       Newline before isEmpty, its on to a different unit of the work really.
   
   I think it would be more readable to either...combine the two 'I can just 
stop now' checks into a single if(empty || CAS) { return }...or instead to gate 
the execute call inside the if(CAS) the same way it is done in the 
asyncProcessCompletion method below.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -1195,23 +1243,27 @@ Executor getDispatcherExecutor() {
     }
 
     private ExecutorService getCompletionExecutor() {
-        ThreadPoolExecutor exec = completionExcecutor;
+        ExecutorService exec = completionExecutor;
         if (exec == null) {
             synchronized (sessionInfo) {
-                exec = completionExcecutor;
+                exec = completionExecutor;
                 if (exec == null) {
-                    exec = createExecutor("completion dispatcher", 
completionThread);
-
+                    if (connection.getCompletionExecutorService() != null) {
+                        exec = connection.getCompletionExecutorService().ref();
+                    } else {
+                        exec = createExecutor("completion dispatcher", null);
+                    }
                     // Ensure work thread is fully up before allowing other 
threads
                     // to attempt to execute on this instance.
-                    Future<?> starter = exec.submit(() -> {});
+                    Future<?> starter = exec.submit(() -> {
+                    });

Review comment:
       Leave it on a single line, its clearer.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
##########
@@ -106,7 +109,7 @@
     private final AtomicBoolean started = new AtomicBoolean();
     private final AtomicReference<Exception> failureCause = new 
AtomicReference<>();
     private final JmsConnectionInfo connectionInfo;
-    private final ThreadPoolExecutor executor;
+    protected final ThreadPoolExecutor executor;

Review comment:
       Why does this need exposed?

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
         return null;
     }
 
+    protected Supplier<Holder<ExecutorService>> 
getCompletionExecutorServiceFactory() {
+        if (this.completionThreads == 0) {
+            return null;
+        }
+        synchronized (this) {
+            if (completionExecutorServiceFactory == null) {
+                QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new 
QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+                completionExecutorServiceFactory = sharedRefCnt(() -> new 
ForkJoinPool(completionThreads, fjThreadFactory, null, false), 
ThreadPoolUtils::shutdown);

Review comment:
       It doesnt feel like we need a supplier of a holder of an executor. Just 
a holder.
   
   I assume thats to try and avoid creating it? Since it seems somewhat 
implicit that if you set the option to get this shared executor behaviour, then 
you actually want it and thus can be expected to need it, its not really clear 
to me its worth the extra mechanics.

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       I'd go with SharedCompletionThreads to emphasize its shared, given its 
both same and cross-connection sharing.
   
   EDIT: actually, is it? I'm not seeing where it would actually share? A new 
SharedDisposable looks to be made afresh each time, and has no statics to share 
across connections, which would seem to mean its just a bigger per-connection 
pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JMS 2 Completion threads shouldn't scale with the number of sessions
> --------------------------------------------------------------------
>
>                 Key: QPIDJMS-552
>                 URL: https://issues.apache.org/jira/browse/QPIDJMS-552
>             Project: Qpid JMS
>          Issue Type: Bug
>          Components: qpid-jms-client
>    Affects Versions: 1.3.0
>            Reporter: Francesco Nigro
>            Priority: Major
>
> JMS 2 Completion threads are now tied to be one per JMS Session ie a client 
> application with N JMS sessions need N completion threads to handle the 
> completion events.
> Given that the asynchronous model of JMS 2 allows users to have few threads 
> handling many JMS sessions, should be better to reduce the amount of 
> completion threads without exceeding the number of available cores and 
> shrink/grow according to the completion event processing load.
> If the user confine a connection in a thread handling many JMS sessions and 
> the completion events are issued by the same Netty thread in sequence, if the 
> completion processing for a JMS Session is fast enough, next JMS Sessions can 
> reuse existing completion threads instead of using a new one.
> This model save using too many completion threads for users tasks that are 
> supposed to be very fast: if the user task cause a specific JMS Session 
> completion thread to block, the expectation is that the system should be able 
> to create a new completion thread to handle other JMS Session completion 
> events, as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to