Github user dick-twocows commented on a diff in the pull request:

    https://github.com/apache/jena/pull/233#discussion_r110245267
  
    --- Diff: 
jena-arq/src/main/java/org/apache/jena/sparql/core/mosaic/TransactionalDistributed.java
 ---
    @@ -0,0 +1,267 @@
    +package org.apache.jena.sparql.core.mosaic;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.ForkJoinPool;
    +import java.util.concurrent.ForkJoinWorkerThread;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
    +
    +import org.apache.jena.query.ReadWrite;
    +import org.apache.jena.shared.JenaException;
    +import org.apache.jena.sparql.core.DatasetGraph;
    +import org.apache.jena.sparql.core.Transactional;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class to support a distributed Transactional.
    + * 
    + * Thread affinity is maintained by associating a Thread per DatasetGraph. 
    + * 
    + * A TransactionalDistributed should be declared via a ThreadLocal and 
thus provides a MRMW.
    + * 
    + * @author dick
    + *
    + */
    +public class TransactionalDistributed implements Transactional {
    +
    +   private static final Logger LOGGER = 
LoggerFactory.getLogger(TransactionalDistributed.class);
    +   
    +   protected final ForkJoinPool forkJoinPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
    +   
    +   protected final String id = Thread.currentThread().toString();
    +
    +   protected final AtomicInteger readCount = new AtomicInteger();
    +   
    +   protected final AtomicInteger writeCount = new AtomicInteger();
    +   
    +   protected volatile ReadWrite readWrite = null;
    +   
    +   protected final Set<DatasetGraph> children = 
ConcurrentHashMap.newKeySet(32);
    +   
    +   protected final ConcurrentMap<DatasetGraph, ThreadProxy> workers = new 
ConcurrentHashMap<>(32);
    +
    +   protected class FJWT extends ForkJoinWorkerThread {
    +
    +           public FJWT(final ForkJoinPool pool) {
    +                   super(pool);
    +           }
    +           
    +   }
    +   
    +   public TransactionalDistributed() {
    +           super();
    +   }
    +   
    +   /**
    +    * Execute the given Runnable.
    +    */
    +   public void execute(final Runnable task) {
    +           try {
    +                   forkJoinPool.submit(task).get();
    +           } catch (final Exception exception) {
    +                   throw new JenaException(exception);
    +           }
    +   }
    +   
    +   /**
    +    * Submit the given Callable.
    +    */
    +   public <T> Future<T> submit(final Callable<T> task) {
    +           try {
    +                   return forkJoinPool.submit(task);
    +           } catch (final Exception exception) {
    +                   throw new JenaException(exception);
    +           }
    +   }
    +   
    +   /*
    +    * Distributed transactional methods. The general goal is to maintain 
transactions on the child DatasetGraph's.  
    +    */
    +
    +   public ReadWrite getType() {
    +           return readWrite;
    +   }
    +   
    +   public boolean isIn(final ReadWrite compare) {
    +           return Objects.equals(readWrite, 
Objects.requireNonNull(compare));
    +   }
    +   
    +   protected Set<DatasetGraph> getChildren() {
    +           return children;
    +   }
    +
    +   protected ConcurrentMap<DatasetGraph, ThreadProxy> getWorkers() {
    +           return workers;
    +   }
    +
    +   /**
    +    * Get the ThreadProxy for the given DatasetGraph.
    +    */
    +   protected ThreadProxy getWorker(final DatasetGraph datasetGraph) {
    +           return getWorkers().computeIfAbsent(datasetGraph, (dg) -> 
{return new ThreadProxy();});
    +   }
    +   
    +   /**
    +    * Submit the given Runnable to the correct ThreadProxy based on the 
given datasetGraph.
    +    */
    +   public void execute(final DatasetGraph datasetGraph, final Runnable 
runnable) {
    +           getWorker(datasetGraph).execute(() -> {
    +                   begin(datasetGraph);
    +                   runnable.run();
    +           });
    +   }
    +   
    +   /**
    +    * Submit the given Supplier to the correct ThreadProxy based on the 
given datasetGraph.
    +    */
    +   public <T> Future<T> submit(final DatasetGraph datasetGraph, final 
Supplier<T> supplier) {
    +           return getWorker(datasetGraph).submit(() -> {
    +                   begin(datasetGraph);
    +                   return supplier.get();
    +           });
    +   }
    +   
    +   /**
    +    * Begin a transaction on the given DatasetGraph.
    +    */
    +   protected DatasetGraph begin(final DatasetGraph datasetGraph) {
    +           if (!isInTransaction()) {
    +                   throw new JenaException("No parent transaction");
    +           }
    +           try {
    +                   if (!children.contains(datasetGraph)) {
    +                           if (datasetGraph.supportsTransactions()) {
    +                                   datasetGraph.begin(readWrite);
    +                           }
    +                           children.add(datasetGraph);
    +                   }
    +                   return datasetGraph;
    +           } catch (final Exception exception) {
    +                   throw new JenaException(exception);
    +           }
    +   }
    +   
    +   /*
    +    * Transactional
    +    */
    +   
    +   @Override
    +   public void begin(final ReadWrite readWrite) {
    +           if (this.readWrite != null) {
    +                   throw new JenaException("Already in a transaction " + 
this.readWrite);
    +           }
    +           this.readWrite = readWrite;
    +   }
    +   
    +   @Override
    +   public void commit() {
    +           List<Exception> threw = new LinkedList<>();
    +           children
    +                   .forEach(datasetGraph -> {
    +                           try {
    +                                   getWorker(datasetGraph).execute(() -> {
    +                                           if 
(datasetGraph.supportsTransactions()) {
    +                                                   datasetGraph.commit();
    +                                           }
    +                                   });
    +                           } catch (final Exception exception) {
    +                                   threw.add(exception);
    +                           }
    +                   });
    +           this.children.clear();
    +           this.readWrite = null;
    +           if (!threw.isEmpty()) {
    +                   final JenaException jenaException = new JenaException();
    +                   threw.forEach((exception) -> 
{jenaException.addSuppressed(exception);});
    +                   throw jenaException;
    +           }
    +   }
    +   
    +   @Override
    +   public void abort() {
    +           List<Exception> threw = new LinkedList<>();
    +           children
    +                   .forEach(datasetGraph -> {
    +                           try {
    +                                   getWorker(datasetGraph).execute(() -> {
    +                                           if 
(datasetGraph.supportsTransactions() && 
datasetGraph.supportsTransactionAbort()) {
    +                                                   datasetGraph.abort();
    +                                           } else {
    +                                                   LOGGER.warn("Attempt to 
call Transactional.abort().", datasetGraph.getClass());
    +                                           }
    +                                   });
    +                           } catch (final Exception exception) {
    +                                   threw.add(exception);
    +                           }
    +                   });
    +           this.children.clear();
    +           this.readWrite = null;
    +           if (!threw.isEmpty()) {
    +                   final JenaException jenaException = new JenaException();
    +                   threw.forEach((exception) -> 
{jenaException.addSuppressed(exception);});
    +                   throw jenaException;
    +           }
    +   }
    +   
    +   @Override
    +   public void end() {
    +           if (readWrite != null && readWrite.equals(ReadWrite.WRITE)) {
    +                   LOGGER.warn("End without commit/abort");
    +           }
    +           List<Exception> threw = new LinkedList<>();
    +           children
    +                   .forEach(datasetGraph -> {
    +                           try {
    +                                   getWorker(datasetGraph).execute(() -> {
    +                                           if 
(datasetGraph.supportsTransactions()) {
    +                                                   datasetGraph.end();
    +                                           }
    +                                   });
    +                           } catch (final Exception exception) {
    +                                   threw.add(exception);
    +                           }
    +                   });
    +           this.children.clear();
    +           this.workers.values().forEach(threadDelegate -> {
    --- End diff --
    
    There'll be more of these ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to