JENA-1612: A lightly parallel loader plan.

Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/21c97633
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/21c97633
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/21c97633

Branch: refs/heads/master
Commit: 21c976332a9444984247f85d1d2ab7a5900ed25d
Parents: ecb01ec
Author: Andy Seaborne <[email protected]>
Authored: Sat Oct 6 17:15:22 2018 +0100
Committer: Andy Seaborne <[email protected]>
Committed: Mon Oct 8 13:02:05 2018 +0100

----------------------------------------------------------------------
 jena-cmds/src/main/java/tdb2/tdbloader.java     | 11 ++++++++--
 .../apache/jena/tdb2/loader/LoaderFactory.java  | 21 ++++++++++++++++++++
 .../jena/tdb2/loader/main/DataToTuples.java     |  4 ++--
 .../tdb2/loader/main/DataToTuplesInline.java    |  2 +-
 .../jena/tdb2/loader/main/InputStage.java       |  2 +-
 .../jena/tdb2/loader/main/LoaderMain.java       |  4 +++-
 .../jena/tdb2/loader/main/LoaderPlans.java      | 21 ++++++++++++++++++++
 .../apache/jena/tdb2/loader/TestLoaderMain.java |  1 +
 .../apache/jena/tdb2/loader/TestLoaderStd.java  |  3 +++
 9 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-cmds/src/main/java/tdb2/tdbloader.java
----------------------------------------------------------------------
diff --git a/jena-cmds/src/main/java/tdb2/tdbloader.java 
b/jena-cmds/src/main/java/tdb2/tdbloader.java
index 52e541a..1ee896e 100644
--- a/jena-cmds/src/main/java/tdb2/tdbloader.java
+++ b/jena-cmds/src/main/java/tdb2/tdbloader.java
@@ -36,6 +36,7 @@ import org.apache.jena.tdb2.loader.DataLoader;
 import org.apache.jena.tdb2.loader.LoaderFactory;
 import org.apache.jena.tdb2.loader.base.LoaderOps;
 import org.apache.jena.tdb2.loader.base.MonitorOutput;
+import org.apache.jena.tdb2.loader.main.LoaderPlans;
 import tdb2.cmdline.CmdTDB;
 import tdb2.cmdline.CmdTDBGraph;
 
@@ -43,7 +44,7 @@ public class tdbloader extends CmdTDBGraph {
     private static final ArgDecl argStats = new ArgDecl(ArgDecl.HasValue,  
"stats");
     private static final ArgDecl argLoader = new ArgDecl(ArgDecl.HasValue, 
"loader");
     
-    private enum LoaderEnum { Basic, Parallel, Sequential, Phased }
+    private enum LoaderEnum { Basic, Parallel, Sequential, Light, Phased }
     
     private boolean showProgress = true;
     private boolean generateStats = false;
@@ -57,7 +58,7 @@ public class tdbloader extends CmdTDBGraph {
     protected tdbloader(String[] argv) {
         super(argv);
 //        super.add(argStats, "Generate statistics");
-        super.add(argLoader, "--loader=", "Loader to use: 'basic', 'phased' 
(default), 'sequential' or 'parallel'");
+        super.add(argLoader, "--loader=", "Loader to use: 'basic', 'phased' 
(default), 'sequential', 'parallel' or 'light'");
     }
 
     @Override
@@ -74,6 +75,10 @@ public class tdbloader extends CmdTDBGraph {
                 loader = LoaderEnum.Sequential;
             else if ( loadername.matches("para.*") )
                 loader = LoaderEnum.Parallel;
+            else if ( loadername.matches("para.*") )
+                loader = LoaderEnum.Parallel;
+            else if ( loadername.matches("light") )
+                loader = LoaderEnum.Light;
             else
                 throw new CmdException("Unrecognized value for --loader: 
"+loadername);
         }
@@ -174,6 +179,8 @@ public class tdbloader extends CmdTDBGraph {
                 return LoaderFactory.parallelLoader(dsg, gn, output);
             case Sequential :
                 return LoaderFactory.sequentialLoader(dsg, gn, output);
+            case Light :
+                return LoaderFactory.createLoader(LoaderPlans.loaderPlanLight, 
dsg, output);
             case Basic :
                 return LoaderFactory.basicLoader(dsg, gn, output);
             default :

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
index 83f13c9..006d1c8 100644
--- 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
+++ 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
@@ -26,8 +26,11 @@ import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.sparql.core.Quad;
 import org.apache.jena.tdb2.loader.base.MonitorOutput;
 import org.apache.jena.tdb2.loader.basic.LoaderBasic;
+import org.apache.jena.tdb2.loader.main.LoaderMain;
 import org.apache.jena.tdb2.loader.main.LoaderParallel;
 import org.apache.jena.tdb2.loader.main.LoaderPhased;
+import org.apache.jena.tdb2.loader.main.LoaderPlan;
+import org.apache.jena.tdb2.loader.main.LoaderPlans;
 import org.apache.jena.tdb2.loader.sequential.LoaderSequential;
 
 /** Obtain a {@link DataLoader}.
@@ -194,6 +197,24 @@ public class LoaderFactory {
     }
 
     /**
+     * Return a loader to load a dataset, using the provided plan.
+     * See {@link LoaderPlans} for the standard plans.
+     */  
+    public static DataLoader createLoader(LoaderPlan plan, DatasetGraph dsg, 
MonitorOutput output) {
+        Objects.requireNonNull(dsg);
+        return new LoaderMain(plan, dsg, output);
+    }
+
+    /**
+     * Return a loader to load a graph, using the provided plan.
+     * See {@link LoaderPlans} for the standard plans.
+     */  
+    public static DataLoader createLoader(LoaderPlan plan, DatasetGraph dsg, 
Node graphName, MonitorOutput output) {
+        Objects.requireNonNull(dsg);
+        return new LoaderMain(plan, dsg, graphName, output);
+    }
+
+    /**
      * Return a general purpose loader to load a dataset.
      * This default may change between versions.
      */  

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
index 02b163d..2b0cbd1 100644
--- 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
+++ 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
@@ -42,8 +42,8 @@ import org.apache.jena.tdb2.store.DatasetGraphTDB;
 import org.apache.jena.tdb2.store.NodeId;
 import org.apache.jena.tdb2.store.nodetable.NodeTable;
 
-/** Batch processing of {@link DataBlock}s (triples or Quads) converting them 
to two output of
- * to blocks of {@code Tuple<NodeId>}.
+/** Batch processing of {@link DataBlock}s (triples or Quads) converting them 
to two outputs of
+ * blocks of {@code Tuple<NodeId>}.
  * <p>
  * This class runs one task thread.
  * <p>

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
index c6b5075..6549425 100644
--- 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
+++ 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
@@ -103,7 +103,7 @@ public class DataToTuplesInline implements 
StreamRDFCounting, BulkStartFinish {
         }
         dispatchTriples(LoaderConst.END_TUPLES);
         if ( quads != null && ! quads.isEmpty() ) {
-            dispatchTriples(quads);
+            dispatchQuads(quads);
             quads = null;
         }
         dispatchQuads(LoaderConst.END_TUPLES);

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
index 13cb6ff..2e6c772 100644
--- 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
+++ 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
@@ -23,7 +23,7 @@ package org.apache.jena.tdb2.loader.main;
  * can be done in several ways.
  * <ul>
  * <li> {@code MULTI} - one thread parsing (caller), one for nodetable/tuples, 
and one for each index
- * <li> {@code PARSE_NODE} - one thread parsing (caller) and also 
nodetable/tuples, and one for each index
+ * <li> {@code PARSE_NODE} - one thread parsing (caller) and one for 
nodetable/tuples, and one for each index
  * <li> {@code PARSE_NODE_INDEX} - use the caller thread for all operations
  * </ul>
  * {@code MULTI} is fastest when hardware allows.

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
index 458d4f0..8bd158d 100644
--- 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
+++ 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
@@ -133,7 +133,8 @@ public class LoaderMain extends LoaderBase implements 
DataLoader {
     }
 
     /**
-     * Create data ingestion and primary index building of a {@link 
LoaderPlan}. 
+     * Create data ingestion and primary index building of a {@link 
LoaderPlan}.
+     * Separate threads for parsing, node table loading and primary index 
building.  
      */
     private static StreamRDFCounting executeData(LoaderPlan loaderPlan, 
DatasetGraphTDB dsgtdb, Map<String, TupleIndex> indexMap, List<BulkStartFinish> 
dataProcess, MonitorOutput output) {
         DatasetPrefixesTDB dps = (DatasetPrefixesTDB)dsgtdb.getPrefixes();
@@ -165,6 +166,7 @@ public class LoaderMain extends LoaderBase implements 
DataLoader {
 
     /**
      * Create data ingestion and primary index building of a {@link 
LoaderPlan}.
+     * One thread for parsing and node table building and one for each primary 
index building.  
      * This version uses a thread for parse/NodeTable/Tuple and a thread for 
each of triple and quad index for phase one.  
      */
     private static StreamRDFCounting executeDataParseId(LoaderPlan loaderPlan, 
DatasetGraphTDB dsgtdb, Map<String, TupleIndex> indexMap, List<BulkStartFinish> 
dataProcess, MonitorOutput output) {

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java
 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java
index a97693c..0649c9d 100644
--- 
a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java
+++ 
b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java
@@ -62,6 +62,27 @@ public class LoaderPlans {
         );
     
     /**
+     * Lightly parallel, intermediate plan: for triples, this is two threaded. 
It aims to
+     * speed up the data phase on a machine where an index is larger than the 
size of
+     * available RAM (not heap). In this case it is better to use RAM for 
better caching a
+     * single index than trying to work in parallel on two or more indexes. 
Like all load
+     * plans, data shape and machine characteristics affect speed so 
experimentation is
+     * recommended.
+     * <p>
+     * Data phase: One thread for parser and building the node table, and one 
thread for
+     * each primary index.
+     * <p>
+     * Index phase: Secondary indexes: one by one.
+     */
+    public static LoaderPlan loaderPlanLight = new LoaderPlan(
+        InputStage.PARSE_NODE,
+        new String[]{ "SPO" },
+        new String[]{ "GSPO" },
+        new String[][]{ { "POS" }, { "OSP" } },
+        new String[][]{ { "GPOS" }, { "GOSP" }, { "SPOG" }, { "POSG" } , { 
"OSPG" } }
+        );
+    
+    /**
      * A nearly sequential process, as a loader plan including single threaded 
first
      * phase. Each index is calculated separately but on a separate thread.
      * <p>

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java
 
b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java
index 3596cf3..0c7093b 100644
--- 
a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java
+++ 
b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java
@@ -43,6 +43,7 @@ public class TestLoaderMain extends AbstractTestLoader {
         add(x, "Simple plan", LoaderPlans.loaderPlanSimple);
         add(x, "Minimal plan", LoaderPlans.loaderPlanMinimal);
         add(x, "Phased Plan", LoaderPlans.loaderPlanPhased);
+        add(x, "Light plan", LoaderPlans.loaderPlanLight);
         add(x, "Parallel plan", LoaderPlans.loaderPlanParallel);
         return x ; 
     }

http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java
----------------------------------------------------------------------
diff --git 
a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java
 
b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java
index 7399423..748cf5d 100644
--- 
a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java
+++ 
b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java
@@ -24,6 +24,7 @@ import java.util.function.BiFunction;
 
 import org.apache.jena.graph.Node ;
 import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.tdb2.loader.main.LoaderPlans;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -41,11 +42,13 @@ public class TestLoaderStd extends AbstractTestLoader {
         BiFunction<DatasetGraph, Node, DataLoader> phased = (dsg, 
gn)->LoaderFactory.phasedLoader(dsg, gn, output);
         BiFunction<DatasetGraph, Node, DataLoader> sequential = (dsg, 
gn)->LoaderFactory.sequentialLoader(dsg, gn, output);
         BiFunction<DatasetGraph, Node, DataLoader> parallel = (dsg, 
gn)->LoaderFactory.parallelLoader(dsg, gn, output);
+        BiFunction<DatasetGraph, Node, DataLoader> light = (dsg, 
gn)->LoaderFactory.createLoader(LoaderPlans.loaderPlanLight, dsg, gn, output);
         
         x.add(new Object[]{"Basic loader", basic}) ;
         x.add(new Object[]{"Phased loader", phased}) ;
         x.add(new Object[]{"Sequential loader", sequential}) ;
         x.add(new Object[]{"Parallel loader", parallel}) ;
+        x.add(new Object[]{"Light loader", light}) ;
         return x ; 
     }
     

Reply via email to