Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110244425
  
    --- Diff: 
jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/ThreadProxy.java ---
    @@ -0,0 +1,100 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * A Thread proxy which performs actions using one thread (the proxy).
    + * Internally an ExecutorService with one Thread is used with the action 
methods execute/submit(Runnable) and submit(Callable<T>).
    + * Calls to the action methods are FIFO actioned through the 
LinkedBlockingQueue of the ExecutorService.
    + * Useful where Thread affinity is required in a parallel processing 
environment, e.g. .parallelStream().flatMap(...)
    + * 
    + * @author dick
    + *
    + */
    +public class ThreadProxy {
    +
    +   private static final Logger LOGGER = 
LoggerFactory.getLogger(ThreadProxy.class);
    +   
    +   protected static final IDFactory ID_FACTORY = 
IDFactory.valueOf(ThreadProxy.class);
    +   
    +   protected static final ThreadGroup THREAD_GROUP = new 
ThreadGroup(ThreadProxy.class.getName());
    +   
    +   protected final String id;
    +   
    +   protected final String createdBy;
    +   
    +   protected final ExecutorService executorService;
    +   
    +   protected final AtomicInteger executeCount;
    +   
    +   protected final AtomicInteger submitCount;
    +   
    +   protected final AtomicInteger exceptionCount;
    +   
    +   public ThreadProxy() {
    +           super();
    +           id = ID_FACTORY.next();
    +           createdBy =  Thread.currentThread().getName();
    +           executorService = new ThreadPoolExecutor(1, 1,
    --- End diff --
    
    Yes and to close the executor, also the parallel streams in mosaic via the 
transactional distributed use a fork join and create a thread proxy per dataset 
graph touched​. Otherwise the system default fork join gets used and that 
limits the threads to the current processor current across all requests!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to