[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r745592151 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -368,6 +377,19 @@ protected static URI createURI(String name) { return null; } +protected Supplier> getCompletionExecutorServiceFactory() { +if (this.completionThreads == 0) { +return null; +} +synchronized (this) { +if (completionExecutorServiceFactory == null) { +QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true); +completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown); Review comment: > No. I was thinking a super simple 'get me a pool' at connection creation that bumps a count and creates a pool if needed if the pool acquisition is always handled in a synchronized block, it would work, but this version I've implemented try to save blocking different connections vs the same lock if there's already an available pool. If this is not a problem, then I can use a simpler version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r745582793 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -368,6 +377,19 @@ protected static URI createURI(String name) { return null; } +protected Supplier> getCompletionExecutorServiceFactory() { +if (this.completionThreads == 0) { +return null; +} +synchronized (this) { +if (completionExecutorServiceFactory == null) { +QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true); +completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown); Review comment: Got it, so you prefer a concurrent map per configuration statically referenced by each connection? The map entries will still be `sharedRefCnt` pools, because I need to consider each connection to be able to concurrently ref/deref the shared pool (with N threads). What I've implemented is already using a synchronization point while allocating the shared pool, but the concurrent mechanism is needed to ensure connections to never block while releasing, nor preventing incoming ones to acquire a new/existing pool -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r745531504 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -591,6 +613,14 @@ public long getConnectTimeout() { return this.connectTimeout; } +public void setCompletionThreads(final int completionThreads) { Review comment: > That would introduce a similar issue to the other JIRA/PR, where people might have different configurations that then needs different pools here...but that seems like it would be easily solved in a pretty similar way, effectively just a map of different setups to match against. This can be done creating some "leaky" (by purpose) singleton shared map with per-configuration pool, but is it a valid use case? Or the typical use case is to have a single factory per-application? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r745530285 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -368,6 +377,19 @@ protected static URI createURI(String name) { return null; } +protected Supplier> getCompletionExecutorServiceFactory() { +if (this.completionThreads == 0) { +return null; +} +synchronized (this) { +if (completionExecutorServiceFactory == null) { +QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true); +completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown); Review comment: > Given that, the mechanism still all seems rather overcomplicated. This feels like a relatively simple case, an 'if there is an existing pool, then use that, otherwise create one' check coupled with the opposing cleanup. One that should be relatively infrequently used. It seems like even a simple synchronized block with a count inside could do? Not sure, there's still a problem related disposing it: 1. the shared/common pool should be allocated once and live forever? 2. if the answer to 1 is no, how/what is going to trigger disposing it? The mechanism I've implemented just handle this use-case using reference counting, but in order to do it, it requires someone to be the first owner while ensuring correct/deterministic release of resources that could cause the whole application/class-loader to leak -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r745530285 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -368,6 +377,19 @@ protected static URI createURI(String name) { return null; } +protected Supplier> getCompletionExecutorServiceFactory() { +if (this.completionThreads == 0) { +return null; +} +synchronized (this) { +if (completionExecutorServiceFactory == null) { +QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true); +completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown); Review comment: > Given that, the mechanism still all seems rather overcomplicated. This feels like a relatively simple case, an 'if there is an existing pool, then use that, otherwise create one' check coupled with the opposing cleanup. One that should be relatively infrequently used. It seems like even a simple synchronized block with a count inside could do? Not sure, there's still a problem related disposing it: 1. the shared/common pool should be allocated once and live forever? 2. if the answer to 1 is no, how/what is going to trigger disposing it? The mechanism I've implemented just handle this use-case using reference counting, but in order to do it, it requires a someone to be the first owner, but at least ensure correct release of resources that could cause the whole application/class-loader to leak -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r743939643 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -591,6 +613,14 @@ public long getConnectTimeout() { return this.connectTimeout; } +public void setCompletionThreads(final int completionThreads) { Review comment: > EDIT: actually, is it? I'm not seeing where it would actually share? The completion factory create a singleton instance of `sharedRefCnt` of `ForkJoinPool` that allows sharing the same FJ pool unless every connection that reference it, got closed. If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new one, similarly to the shared event loop group of #45 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -591,6 +613,14 @@ public long getConnectTimeout() { return this.connectTimeout; } +public void setCompletionThreads(final int completionThreads) { Review comment: > EDIT: actually, is it? I'm not seeing where it would actually share? The completion factory create a singleton instance of `sharedRefCnt` of `ForkJoinPool` that allows sharing the same FJ pool unless every connection that reference it get closed. If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new one, similarly to the shared event loop group of #45 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -368,6 +377,19 @@ protected static URI createURI(String name) { return null; } +protected Supplier> getCompletionExecutorServiceFactory() { +if (this.completionThreads == 0) { +return null; +} +synchronized (this) { +if (completionExecutorServiceFactory == null) { +QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true); +completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown); Review comment: > I assume thats to try and avoid creating it? Given that we cannot rely on finalization of connection factory, I cannot pre-allocate it if there are no actual "users" ie connections. And I would like it to be correctly disposed and shutdown while every connection belonging to the connction factory got closed. ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ## @@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) { } } +private void processCompletions() { +assert processCompletion.get(); +completionThread = Thread.currentThread(); +try { +final Runnable completionTask = completionTasks.poll(); +if (completionTask != null) { +try { +completionTask.run(); +} catch (Throwable t) { +LOG.debug("errored on processCompletions duty cycle", t); +} +} +} finally { +completionThread = null; +processCompletion.set(false); +} +if (completionTasks.isEmpty()) { +return; +} +// a racing asyncProcessCompletion has won: no need to fire a continuation +if (!processCompletion.compareAndSet(false, true)) { +return; +} +getCompletionExecutor().execute(this::processCompletions); +} + +private void asyncProcessCompletion(final Runnable completionTask) { +asyncProcessCompletion(completionTask, false); +} + +private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) { +if (!ignoreSessionClosed) { Review comment: I can let them to pass, but this one was to mimic the original reject handler installed for the completion single threaded executor: I admit I didn't put much thoughts on this to validate if it can be saved ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ## @@ -591,6 +613,14 @@ public long getConnectTimeout() { return this.connectTimeout; } +public void setCompletionThreads(final int completionThreads) { Review comment: > EDIT: actually, is it? I'm not seeing where it would actually share? The completion factory create a singleton instance of `sharedRefCnt` of `ForkJoinPool` that allows sharing the same FJ pool unless every connection that reference it, got closed. If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new o
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r737530088 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ## @@ -194,6 +193,42 @@ public void onPendingFailure(ProviderException cause) { } } +private void processCompletions() { +do { +if (!completionThread.compareAndSet(null, Thread.currentThread())) { +return; +} +try { +Runnable completionTask; +while ((completionTask = completionTasks.poll()) != null) { +try { +completionTask.run(); +} catch (Throwable t) { +LOG.debug("errored on processCompletions duty cycle", t); +} +} +} finally { +completionThread.set(null); +} +} while (!completionTasks.isEmpty()); Review comment: isEmpty can be called by *any* threads concurrently without restrictions, but `poll` has to be called in a single threaded fashion ie one thread "at time" or just the same thread The use case of isEmpty is exactly for this type of duty cycle loop (very common in Akka actors mailboxes, that use JCTools), which can be executed by any thread, but serially, and is used to detect if it worth to "continue" draining the mailbox. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r737506809 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ## @@ -71,6 +69,7 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import io.netty.util.internal.PlatformDependent; Review comment: In this case we can just use a CLQ, although it means creating a new Node for each completion event, that seems a waste to me given that we have a rare and nice single consumer use case here, but I understand that adding a JCTools dep just for this isn't that good the same... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r737491913 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ## @@ -194,6 +193,42 @@ public void onPendingFailure(ProviderException cause) { } } +private void processCompletions() { +do { +if (!completionThread.compareAndSet(null, Thread.currentThread())) { +return; +} +try { +Runnable completionTask; +while ((completionTask = completionTasks.poll()) != null) { +try { +completionTask.run(); +} catch (Throwable t) { +LOG.debug("errored on processCompletions duty cycle", t); +} +} +} finally { +completionThread.set(null); +} +} while (!completionTasks.isEmpty()); Review comment: isEmpty is safe to be called by many threads and ensure that a subsequent (single threaded) poll won't return null in case `false` (we have a good coverage for this on JCTools) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r737494700 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ## @@ -183,6 +189,44 @@ JmsConnection connect() throws JMSException { return this; } +ExecutorService getCompletionExecutor() { +ThreadPoolExecutor exec = completionExecutor; +if (exec == null) { +synchronized (this) { +exec = completionExecutor; +if (exec == null) { +// it can grow "unbounded" to serve multiple concurrent session completions: +// in reality it is bounded by the amount of concurrent completion requests +exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedTransferQueue<>(), + new QpidJMSThreadFactory("JmsConnection ["+ connectionInfo.getId() + "] completion dispatcher", connectionInfo.isUseDaemonThread())); Review comment: > Previously they were named based on their individual session. It might be good to at least reinstate that name while operating for a given session? You're right, indeed it was nice to have a good name to find them both on stack trace and while profiling, let me think how to improve this If the user doesn't use anymore JMS 2 features in same connection and close any existing session that use completions, I expect the pool to release any existing completion thread (at some point), but as you rightly said, it's a pretty weird an unhappy use case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r737491913 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ## @@ -194,6 +193,42 @@ public void onPendingFailure(ProviderException cause) { } } +private void processCompletions() { +do { +if (!completionThread.compareAndSet(null, Thread.currentThread())) { +return; +} +try { +Runnable completionTask; +while ((completionTask = completionTasks.poll()) != null) { +try { +completionTask.run(); +} catch (Throwable t) { +LOG.debug("errored on processCompletions duty cycle", t); +} +} +} finally { +completionThread.set(null); +} +} while (!completionTasks.isEmpty()); Review comment: isEmpty is safe to be called by many threads and we have (on JCTools) and it ensure that a subsequence (single threaded) poll won't return null in case `false` (we have a good coverage for this on JCTools) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r737484210 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ## @@ -71,6 +69,7 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import io.netty.util.internal.PlatformDependent; Review comment: It's just a shortcut to save checking for Unsafe existence: JCTools queues are good performers (and mostly GC free) but need to decide if use the Atomic or Unsafe variant based on Unsafe class presence; using this Netty util class save checking it by ourself, but can be changed to perform some explicit check too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org
[GitHub] [qpid-jms] franz1981 commented on a change in pull request #44: QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions
franz1981 commented on a change in pull request #44: URL: https://github.com/apache/qpid-jms/pull/44#discussion_r737484210 ## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ## @@ -71,6 +69,7 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import io.netty.util.internal.PlatformDependent; Review comment: It's just a shortcut to save checking for Unsafe existence: JCTools queues are good performers (and mostly GC free) but need to decide if use the Atomic or Unsafe variant; using this Netty util class save checking it by ourself, but can be changed to perform some explicit check too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org