[ 
https://issues.apache.org/jira/browse/QPIDJMS-552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439504#comment-17439504
 ] 

ASF GitHub Bot commented on QPIDJMS-552:
----------------------------------------

franz1981 commented on a change in pull request #44:
URL: https://github.com/apache/qpid-jms/pull/44#discussion_r743939643



##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of 
`ForkJoinPool` that allows sharing the same FJ pool unless every connection 
that reference it, got closed. If that happen, the last one would dispose it, 
leaving incoming connections (if any) able to create a new one, similarly to 
the shared event loop group of #45 

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of 
`ForkJoinPool` that allows sharing the same FJ pool unless every connection 
that reference it get closed. 
   If that happen, the last one would dispose it, leaving incoming connections 
(if any) able to create a new one, similarly to the shared event loop group of 
#45 

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
         return null;
     }
 
+    protected Supplier<Holder<ExecutorService>> 
getCompletionExecutorServiceFactory() {
+        if (this.completionThreads == 0) {
+            return null;
+        }
+        synchronized (this) {
+            if (completionExecutorServiceFactory == null) {
+                QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new 
QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+                completionExecutorServiceFactory = sharedRefCnt(() -> new 
ForkJoinPool(completionThreads, fjThreadFactory, null, false), 
ThreadPoolUtils::shutdown);

Review comment:
       > I assume thats to try and avoid creating it?
   
   Given that we cannot rely on finalization of connection factory, I cannot 
pre-allocate it if there are no actual "users" ie connections. And I would like 
it to be correctly disposed and shutdown while every connection belonging to 
the connction factory got closed.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a 
continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final 
boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       I can let them to pass, but this one was to mimic the original reject 
handler installed for the completion single threaded executor: I admit I didn't 
put much thoughts on this to validate if it can be saved

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of 
`ForkJoinPool` that allows sharing the same FJ pool unless every connection 
that reference it, got closed. If that happen, the last one would dispose it, 
leaving incoming connections (if any) able to create a new one, similarly to 
the shared event loop group of #45 

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -591,6 +613,14 @@ public long getConnectTimeout() {
         return this.connectTimeout;
     }
 
+    public void setCompletionThreads(final int completionThreads) {

Review comment:
       > EDIT: actually, is it? I'm not seeing where it would actually share?
   
   The completion factory create a singleton instance of `sharedRefCnt` of 
`ForkJoinPool` that allows sharing the same FJ pool unless every connection 
that reference it get closed. 
   If that happen, the last one would dispose it, leaving incoming connections 
(if any) able to create a new one, similarly to the shared event loop group of 
#45 

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
##########
@@ -368,6 +377,19 @@ protected static URI createURI(String name) {
         return null;
     }
 
+    protected Supplier<Holder<ExecutorService>> 
getCompletionExecutorServiceFactory() {
+        if (this.completionThreads == 0) {
+            return null;
+        }
+        synchronized (this) {
+            if (completionExecutorServiceFactory == null) {
+                QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new 
QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
+                completionExecutorServiceFactory = sharedRefCnt(() -> new 
ForkJoinPool(completionThreads, fjThreadFactory, null, false), 
ThreadPoolUtils::shutdown);

Review comment:
       > I assume thats to try and avoid creating it?
   
   Given that we cannot rely on finalization of connection factory, I cannot 
pre-allocate it if there are no actual "users" ie connections. And I would like 
it to be correctly disposed and shutdown while every connection belonging to 
the connction factory got closed.

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a 
continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final 
boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       I can let them to pass, but this one was to mimic the original reject 
handler installed for the completion single threaded executor: I admit I didn't 
put much thoughts on this to validate if it can be saved

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a 
continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final 
boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       > I expect this change would mean they get could be marked as failed 
when they shouldnt
   
   I've implemented it intending to mimic (by inlining) the reject policy of 
the single threaded executor: I admit i didn't put much thoughts into this, but 
probably can be treated differently and it's maybe introducing a slightly 
different semantic, but I still don't see any harm; my expectation is that when 
session::shutdown is called, any already submitted completion should be handled 
(and that's visible in the new shutdown logic using CountDownLatch), but new 
submissions would be processed if sent *after* shutdown is initiated (unless 
part of the `shutdown` logic itself ie `ignoreSessionClosed == true`)

##########
File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
##########
@@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
         }
     }
 
+    private void processCompletions() {
+        assert processCompletion.get();
+        completionThread = Thread.currentThread();
+        try {
+            final Runnable completionTask = completionTasks.poll();
+            if (completionTask != null) {
+                try {
+                    completionTask.run();
+                } catch (Throwable t) {
+                    LOG.debug("errored on processCompletions duty cycle", t);
+                }
+            }
+        } finally {
+            completionThread = null;
+            processCompletion.set(false);
+        }
+        if (completionTasks.isEmpty()) {
+            return;
+        }
+        // a racing asyncProcessCompletion has won: no need to fire a 
continuation
+        if (!processCompletion.compareAndSet(false, true)) {
+            return;
+        }
+        getCompletionExecutor().execute(this::processCompletions);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask) {
+        asyncProcessCompletion(completionTask, false);
+    }
+
+    private void asyncProcessCompletion(final Runnable completionTask, final 
boolean ignoreSessionClosed) {
+        if (!ignoreSessionClosed) {

Review comment:
       > I expect this change would mean they get could be marked as failed 
when they shouldnt
   
   I've implemented it intending to mimic (by inlining) the reject policy of 
the single threaded executor: I admit i didn't put much thoughts into this, but 
probably can be treated differently and it's maybe introducing a slightly 
different semantic, but I still don't see any harm; my expectation is that when 
session::shutdown is called, any already submitted completion should be handled 
(and that's visible in the new shutdown logic using CountDownLatch), but new 
submissions would be processed if sent *after* shutdown is initiated (unless 
part of the `shutdown` logic itself ie `ignoreSessionClosed == true`) and 
ignored otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JMS 2 Completion threads shouldn't scale with the number of sessions
> --------------------------------------------------------------------
>
>                 Key: QPIDJMS-552
>                 URL: https://issues.apache.org/jira/browse/QPIDJMS-552
>             Project: Qpid JMS
>          Issue Type: Bug
>          Components: qpid-jms-client
>    Affects Versions: 1.3.0
>            Reporter: Francesco Nigro
>            Priority: Major
>
> JMS 2 Completion threads are now tied to be one per JMS Session ie a client 
> application with N JMS sessions need N completion threads to handle the 
> completion events.
> Given that the asynchronous model of JMS 2 allows users to have few threads 
> handling many JMS sessions, should be better to reduce the amount of 
> completion threads without exceeding the number of available cores and 
> shrink/grow according to the completion event processing load.
> If the user confine a connection in a thread handling many JMS sessions and 
> the completion events are issued by the same Netty thread in sequence, if the 
> completion processing for a JMS Session is fast enough, next JMS Sessions can 
> reuse existing completion threads instead of using a new one.
> This model save using too many completion threads for users tasks that are 
> supposed to be very fast: if the user task cause a specific JMS Session 
> completion thread to block, the expectation is that the system should be able 
> to create a new completion thread to handle other JMS Session completion 
> events, as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to