JENA-1612: A lightly parallel loader plan. Project: http://git-wip-us.apache.org/repos/asf/jena/repo Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/21c97633 Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/21c97633 Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/21c97633
Branch: refs/heads/master Commit: 21c976332a9444984247f85d1d2ab7a5900ed25d Parents: ecb01ec Author: Andy Seaborne <[email protected]> Authored: Sat Oct 6 17:15:22 2018 +0100 Committer: Andy Seaborne <[email protected]> Committed: Mon Oct 8 13:02:05 2018 +0100 ---------------------------------------------------------------------- jena-cmds/src/main/java/tdb2/tdbloader.java | 11 ++++++++-- .../apache/jena/tdb2/loader/LoaderFactory.java | 21 ++++++++++++++++++++ .../jena/tdb2/loader/main/DataToTuples.java | 4 ++-- .../tdb2/loader/main/DataToTuplesInline.java | 2 +- .../jena/tdb2/loader/main/InputStage.java | 2 +- .../jena/tdb2/loader/main/LoaderMain.java | 4 +++- .../jena/tdb2/loader/main/LoaderPlans.java | 21 ++++++++++++++++++++ .../apache/jena/tdb2/loader/TestLoaderMain.java | 1 + .../apache/jena/tdb2/loader/TestLoaderStd.java | 3 +++ 9 files changed, 62 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-cmds/src/main/java/tdb2/tdbloader.java ---------------------------------------------------------------------- diff --git a/jena-cmds/src/main/java/tdb2/tdbloader.java b/jena-cmds/src/main/java/tdb2/tdbloader.java index 52e541a..1ee896e 100644 --- a/jena-cmds/src/main/java/tdb2/tdbloader.java +++ b/jena-cmds/src/main/java/tdb2/tdbloader.java @@ -36,6 +36,7 @@ import org.apache.jena.tdb2.loader.DataLoader; import org.apache.jena.tdb2.loader.LoaderFactory; import org.apache.jena.tdb2.loader.base.LoaderOps; import org.apache.jena.tdb2.loader.base.MonitorOutput; +import org.apache.jena.tdb2.loader.main.LoaderPlans; import tdb2.cmdline.CmdTDB; import tdb2.cmdline.CmdTDBGraph; @@ -43,7 +44,7 @@ public class tdbloader extends CmdTDBGraph { private static final ArgDecl argStats = new ArgDecl(ArgDecl.HasValue, "stats"); private static final ArgDecl argLoader = new ArgDecl(ArgDecl.HasValue, "loader"); - private enum LoaderEnum { Basic, Parallel, Sequential, Phased } + private enum LoaderEnum { Basic, Parallel, Sequential, Light, Phased } private boolean showProgress = true; private boolean generateStats = false; @@ -57,7 +58,7 @@ public class tdbloader extends CmdTDBGraph { protected tdbloader(String[] argv) { super(argv); // super.add(argStats, "Generate statistics"); - super.add(argLoader, "--loader=", "Loader to use: 'basic', 'phased' (default), 'sequential' or 'parallel'"); + super.add(argLoader, "--loader=", "Loader to use: 'basic', 'phased' (default), 'sequential', 'parallel' or 'light'"); } @Override @@ -74,6 +75,10 @@ public class tdbloader extends CmdTDBGraph { loader = LoaderEnum.Sequential; else if ( loadername.matches("para.*") ) loader = LoaderEnum.Parallel; + else if ( loadername.matches("para.*") ) + loader = LoaderEnum.Parallel; + else if ( loadername.matches("light") ) + loader = LoaderEnum.Light; else throw new CmdException("Unrecognized value for --loader: "+loadername); } @@ -174,6 +179,8 @@ public class tdbloader extends CmdTDBGraph { return LoaderFactory.parallelLoader(dsg, gn, output); case Sequential : return LoaderFactory.sequentialLoader(dsg, gn, output); + case Light : + return LoaderFactory.createLoader(LoaderPlans.loaderPlanLight, dsg, output); case Basic : return LoaderFactory.basicLoader(dsg, gn, output); default : http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java index 83f13c9..006d1c8 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java @@ -26,8 +26,11 @@ import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.Quad; import org.apache.jena.tdb2.loader.base.MonitorOutput; import org.apache.jena.tdb2.loader.basic.LoaderBasic; +import org.apache.jena.tdb2.loader.main.LoaderMain; import org.apache.jena.tdb2.loader.main.LoaderParallel; import org.apache.jena.tdb2.loader.main.LoaderPhased; +import org.apache.jena.tdb2.loader.main.LoaderPlan; +import org.apache.jena.tdb2.loader.main.LoaderPlans; import org.apache.jena.tdb2.loader.sequential.LoaderSequential; /** Obtain a {@link DataLoader}. @@ -194,6 +197,24 @@ public class LoaderFactory { } /** + * Return a loader to load a dataset, using the provided plan. + * See {@link LoaderPlans} for the standard plans. + */ + public static DataLoader createLoader(LoaderPlan plan, DatasetGraph dsg, MonitorOutput output) { + Objects.requireNonNull(dsg); + return new LoaderMain(plan, dsg, output); + } + + /** + * Return a loader to load a graph, using the provided plan. + * See {@link LoaderPlans} for the standard plans. + */ + public static DataLoader createLoader(LoaderPlan plan, DatasetGraph dsg, Node graphName, MonitorOutput output) { + Objects.requireNonNull(dsg); + return new LoaderMain(plan, dsg, graphName, output); + } + + /** * Return a general purpose loader to load a dataset. * This default may change between versions. */ http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java index 02b163d..2b0cbd1 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java @@ -42,8 +42,8 @@ import org.apache.jena.tdb2.store.DatasetGraphTDB; import org.apache.jena.tdb2.store.NodeId; import org.apache.jena.tdb2.store.nodetable.NodeTable; -/** Batch processing of {@link DataBlock}s (triples or Quads) converting them to two output of - * to blocks of {@code Tuple<NodeId>}. +/** Batch processing of {@link DataBlock}s (triples or Quads) converting them to two outputs of + * blocks of {@code Tuple<NodeId>}. * <p> * This class runs one task thread. * <p> http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java index c6b5075..6549425 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java @@ -103,7 +103,7 @@ public class DataToTuplesInline implements StreamRDFCounting, BulkStartFinish { } dispatchTriples(LoaderConst.END_TUPLES); if ( quads != null && ! quads.isEmpty() ) { - dispatchTriples(quads); + dispatchQuads(quads); quads = null; } dispatchQuads(LoaderConst.END_TUPLES); http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java index 13cb6ff..2e6c772 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java @@ -23,7 +23,7 @@ package org.apache.jena.tdb2.loader.main; * can be done in several ways. * <ul> * <li> {@code MULTI} - one thread parsing (caller), one for nodetable/tuples, and one for each index - * <li> {@code PARSE_NODE} - one thread parsing (caller) and also nodetable/tuples, and one for each index + * <li> {@code PARSE_NODE} - one thread parsing (caller) and one for nodetable/tuples, and one for each index * <li> {@code PARSE_NODE_INDEX} - use the caller thread for all operations * </ul> * {@code MULTI} is fastest when hardware allows. http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java index 458d4f0..8bd158d 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java @@ -133,7 +133,8 @@ public class LoaderMain extends LoaderBase implements DataLoader { } /** - * Create data ingestion and primary index building of a {@link LoaderPlan}. + * Create data ingestion and primary index building of a {@link LoaderPlan}. + * Separate threads for parsing, node table loading and primary index building. */ private static StreamRDFCounting executeData(LoaderPlan loaderPlan, DatasetGraphTDB dsgtdb, Map<String, TupleIndex> indexMap, List<BulkStartFinish> dataProcess, MonitorOutput output) { DatasetPrefixesTDB dps = (DatasetPrefixesTDB)dsgtdb.getPrefixes(); @@ -165,6 +166,7 @@ public class LoaderMain extends LoaderBase implements DataLoader { /** * Create data ingestion and primary index building of a {@link LoaderPlan}. + * One thread for parsing and node table building and one for each primary index building. * This version uses a thread for parse/NodeTable/Tuple and a thread for each of triple and quad index for phase one. */ private static StreamRDFCounting executeDataParseId(LoaderPlan loaderPlan, DatasetGraphTDB dsgtdb, Map<String, TupleIndex> indexMap, List<BulkStartFinish> dataProcess, MonitorOutput output) { http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java index a97693c..0649c9d 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlans.java @@ -62,6 +62,27 @@ public class LoaderPlans { ); /** + * Lightly parallel, intermediate plan: for triples, this is two threaded. It aims to + * speed up the data phase on a machine where an index is larger than the size of + * available RAM (not heap). In this case it is better to use RAM for better caching a + * single index than trying to work in parallel on two or more indexes. Like all load + * plans, data shape and machine characteristics affect speed so experimentation is + * recommended. + * <p> + * Data phase: One thread for parser and building the node table, and one thread for + * each primary index. + * <p> + * Index phase: Secondary indexes: one by one. + */ + public static LoaderPlan loaderPlanLight = new LoaderPlan( + InputStage.PARSE_NODE, + new String[]{ "SPO" }, + new String[]{ "GSPO" }, + new String[][]{ { "POS" }, { "OSP" } }, + new String[][]{ { "GPOS" }, { "GOSP" }, { "SPOG" }, { "POSG" } , { "OSPG" } } + ); + + /** * A nearly sequential process, as a loader plan including single threaded first * phase. Each index is calculated separately but on a separate thread. * <p> http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java index 3596cf3..0c7093b 100644 --- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java +++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderMain.java @@ -43,6 +43,7 @@ public class TestLoaderMain extends AbstractTestLoader { add(x, "Simple plan", LoaderPlans.loaderPlanSimple); add(x, "Minimal plan", LoaderPlans.loaderPlanMinimal); add(x, "Phased Plan", LoaderPlans.loaderPlanPhased); + add(x, "Light plan", LoaderPlans.loaderPlanLight); add(x, "Parallel plan", LoaderPlans.loaderPlanParallel); return x ; } http://git-wip-us.apache.org/repos/asf/jena/blob/21c97633/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java ---------------------------------------------------------------------- diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java index 7399423..748cf5d 100644 --- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java +++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/loader/TestLoaderStd.java @@ -24,6 +24,7 @@ import java.util.function.BiFunction; import org.apache.jena.graph.Node ; import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.tdb2.loader.main.LoaderPlans; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -41,11 +42,13 @@ public class TestLoaderStd extends AbstractTestLoader { BiFunction<DatasetGraph, Node, DataLoader> phased = (dsg, gn)->LoaderFactory.phasedLoader(dsg, gn, output); BiFunction<DatasetGraph, Node, DataLoader> sequential = (dsg, gn)->LoaderFactory.sequentialLoader(dsg, gn, output); BiFunction<DatasetGraph, Node, DataLoader> parallel = (dsg, gn)->LoaderFactory.parallelLoader(dsg, gn, output); + BiFunction<DatasetGraph, Node, DataLoader> light = (dsg, gn)->LoaderFactory.createLoader(LoaderPlans.loaderPlanLight, dsg, gn, output); x.add(new Object[]{"Basic loader", basic}) ; x.add(new Object[]{"Phased loader", phased}) ; x.add(new Object[]{"Sequential loader", sequential}) ; x.add(new Object[]{"Parallel loader", parallel}) ; + x.add(new Object[]{"Light loader", light}) ; return x ; }
