Github user dick-twocows commented on a diff in the pull request: https://github.com/apache/jena/pull/233#discussion_r110244425 --- Diff: jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java --- @@ -0,0 +1,100 @@ +package org.apache.jena.sparql.core.mosaic; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Thread proxy which performs actions using one thread (the proxy). + * Internally an ExecutorService with one Thread is used with the action methods execute/submit(Runnable) and submit(Callable<T>). + * Calls to the action methods are FIFO actioned through the LinkedBlockingQueue of the ExecutorService. + * Useful where Thread affinity is required in a parallel processing environment, e.g. .parallelStream().flatMap(...) + * + * @author dick + * + */ +public class ThreadProxy { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadProxy.class); + + protected static final IDFactory ID_FACTORY = IDFactory.valueOf(ThreadProxy.class); + + protected static final ThreadGroup THREAD_GROUP = new ThreadGroup(ThreadProxy.class.getName()); + + protected final String id; + + protected final String createdBy; + + protected final ExecutorService executorService; + + protected final AtomicInteger executeCount; + + protected final AtomicInteger submitCount; + + protected final AtomicInteger exceptionCount; + + public ThreadProxy() { + super(); + id = ID_FACTORY.next(); + createdBy = Thread.currentThread().getName(); + executorService = new ThreadPoolExecutor(1, 1, --- End diff -- Yes and to close the executor, also the parallel streams in mosaic via the transactional distributed use a fork join and create a thread proxy per dataset graph touchedâ. Otherwise the system default fork join gets used and that limits the threads to the current processor current across all requests!
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---