Repository: giraph Updated Branches: refs/heads/trunk 79e7f1c98 -> 17355f558
[GIRAPH-1013] Add BlockExecutionTest Summary: Add support for executing single blocks, as well as adding a test for core of the framework Equivalent to internal https://phabricator.fb.com/D2137589 diff. Test Plan: mvn clean install Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov Reviewed By: sergey.edunov Differential Revision: https://reviews.facebook.net/D39873 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/17355f55 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/17355f55 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/17355f55 Branch: refs/heads/trunk Commit: 17355f55811be1b1392c3ca066fb9adf803846d3 Parents: 79e7f1c Author: Igor Kabiljo <[email protected]> Authored: Mon Jun 8 16:24:45 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Fri Jun 12 20:35:52 2015 -0700 ---------------------------------------------------------------------- .../giraph/block_app/framework/BlockUtils.java | 52 ++++--- .../framework/api/local/LocalBlockRunner.java | 106 ++++++++++--- .../framework/internal/BlockMasterLogic.java | 23 ++- .../block_app/framework/BlockExecutionTest.java | 156 +++++++++++++++++++ 4 files changed, 290 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java index df260f5..3175a55 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java @@ -54,7 +54,8 @@ public class BlockUtils { /** Property describing BlockFactory to use for current application run */ public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS = ClassConfOption.create( - "digraph.block_worker_context_value_class", null, Object.class, + "digraph.block_worker_context_value_class", + Object.class, Object.class, "block worker context value class"); private static final Logger LOG = Logger.getLogger(BlockUtils.class); @@ -136,6 +137,34 @@ public class BlockUtils { // Create blocks to detect issues before creating a Giraph job // They will not be used here Block executionBlock = blockFactory.createBlock(immConf); + checkBlockTypes( + executionBlock, blockFactory.createExecutionStage(immConf), + conf, immConf); + + // check for non 'static final' fields in BlockFactories + Class<?> bfClass = blockFactory.getClass(); + while (!bfClass.equals(Object.class)) { + for (Field field : bfClass.getDeclaredFields()) { + if (!Modifier.isStatic(field.getModifiers()) || + !Modifier.isFinal(field.getModifiers())) { + throw new IllegalStateException("BlockFactory (" + bfClass + + ") cannot have any mutable (non 'static final') fields as a " + + "safety measure, as createBlock function is called from a " + + "different context then all other functions, use conf argument " + + "instead, or make it 'static final'. Field present: " + field); + } + } + bfClass = bfClass.getSuperclass(); + } + + // Register outputs + blockFactory.registerOutputs(conf); + } + + public static void checkBlockTypes( + Block executionBlock, Object executionStage, + GiraphConfiguration conf, + final ImmutableClassesGiraphConfiguration immConf) { LOG.info("Executing application - " + executionBlock); final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf); @@ -146,7 +175,7 @@ public class BlockUtils { final Class<?> workerContextValueClass = BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf); final Class<?> executionStageClass = - blockFactory.createExecutionStage(conf).getClass(); + executionStage.getClass(); // Check for type inconsistencies executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() { @@ -183,24 +212,5 @@ public class BlockUtils { } } }); - - // check for non 'static final' fields in BlockFactories - Class<?> bfClass = blockFactory.getClass(); - while (!bfClass.equals(Object.class)) { - for (Field field : bfClass.getDeclaredFields()) { - if (!Modifier.isStatic(field.getModifiers()) || - !Modifier.isFinal(field.getModifiers())) { - throw new IllegalStateException("BlockFactory (" + bfClass + - ") cannot have any mutable (non 'static final') fields as a " + - "safety measure, as createBlock function is called from a " + - "different context then all other functions, use conf argument " + - "instead, or make it 'static final'. Field present: " + field); - } - } - bfClass = bfClass.getSuperclass(); - } - - // Register outputs - blockFactory.registerOutputs(conf); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java index bdf3233..ea6817f 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java @@ -27,7 +27,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.apache.giraph.block_app.framework.BlockFactory; +import org.apache.giraph.block_app.framework.BlockUtils; import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi; +import org.apache.giraph.block_app.framework.block.Block; import org.apache.giraph.block_app.framework.internal.BlockMasterLogic; import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic; import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic; @@ -50,12 +53,20 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; /** - * Local in-memory Block application job runner, used for testing. + * Local in-memory Block application job runner. + * Implementation should be faster then using InternalVertexRunner. + * + * Useful for fast testing. */ @SuppressWarnings({ "rawtypes", "unchecked" }) public class LocalBlockRunner { - public static final IntConfOption NUM_WORKERS = new IntConfOption( - "test.LocalBlockRunner.NUM_WORKERS", 3, ""); + /** Number of threads to use */ + public static final IntConfOption NUM_THREADS = new IntConfOption( + "test.LocalBlockRunner.NUM_THREADS", 3, ""); + /** + * Whether to run all supported checks. Disable if you are running this + * not within a unit test, and on a large graph, where performance matters. + */ public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption( "test.LocalBlockRunner.RUN_ALL_CHECKS", true, ""); // merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working @@ -66,41 +77,77 @@ public class LocalBlockRunner { private LocalBlockRunner() { } /** + * Run Block Application specified within the conf, on a given graph, + * locally, in-memory. + * * With a boolean flag, you can switch between LocalBlockRunner and - * InternalVertexRunner for running the unit test. + * InternalVertexRunner implementations of local in-memory computation. */ public static <I extends WritableComparable, V extends Writable, E extends Writable> - TestGraph<I, V, E> runWithInMemoryOutput( + TestGraph<I, V, E> runApp( TestGraph<I, V, E> graph, GiraphConfiguration conf, boolean useFullDigraphTests) throws Exception { if (useFullDigraphTests) { return InternalVertexRunner.runWithInMemoryOutput(conf, graph); } else { - runWithInMemoryOutput(graph, conf); + runApp(graph, conf); return graph; } } + /** + * Run Block Application specified within the conf, on a given graph, + * locally, in-memory. + */ public static <I extends WritableComparable, V extends Writable, E extends Writable> - void runWithInMemoryOutput( - TestGraph<I, V, E> graph, GiraphConfiguration conf) throws Exception { - VertexSaver<I, V, E> noOpVertexSaver = new VertexSaver<I, V, E>() { - @Override - public void saveVertex(Vertex<I, V, E> vertex) { - // No-op - } - }; - runWithVertexSaverOutput(graph, noOpVertexSaver, conf); + void runApp(TestGraph<I, V, E> graph, GiraphConfiguration conf) { + VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver(); + runAppWithVertexOutput(graph, noOpVertexSaver, conf); } + /** + * Run Block from a specified execution stage on a given graph, + * locally, in-memory. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + void runBlock( + TestGraph<I, V, E> graph, Block block, Object executionStage, + GiraphConfiguration conf) { + VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver(); + runBlockWithVertexOutput( + block, executionStage, graph, noOpVertexSaver, conf); + } + + + /** + * Run Block Application specified within the conf, on a given graph, + * locally, in-memory, with a given vertexSaver. + */ public static <I extends WritableComparable, V extends Writable, E extends Writable> - void runWithVertexSaverOutput( + void runAppWithVertexOutput( TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver, - GiraphConfiguration conf) throws Exception { - int numWorkers = NUM_WORKERS.get(conf); + GiraphConfiguration conf) { + BlockFactory<?> factory = BlockUtils.createBlockFactory(conf); + runBlockWithVertexOutput( + factory.createBlock(conf), factory.createExecutionStage(conf), + graph, vertexSaver, conf); + } + + /** + * Run Block from a specified execution stage on a given graph, + * locally, in-memory, with a given vertexSaver. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + void runBlockWithVertexOutput( + Block block, Object executionStage, TestGraph<I, V, E> graph, + final VertexSaver<I, V, E> vertexSaver, GiraphConfiguration conf + ) { + int numWorkers = NUM_THREADS.get(conf); boolean runAllChecks = RUN_ALL_CHECKS.get(conf); boolean serializeMaster = SERIALIZE_MASTER.get(conf); final boolean doOutputDuringComputation = conf.doOutputDuringComputation(); @@ -111,8 +158,10 @@ public class LocalBlockRunner { new InternalApi(graph, immConf, runAllChecks); final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi(); + BlockUtils.checkBlockTypes(block, executionStage, conf, immConf); + BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>(); - blockMasterLogic.initialize(immConf, internalApi); + blockMasterLogic.initialize(block, executionStage, internalApi); BlockWorkerContextLogic workerContextLogic = internalApi.getWorkerContextLogic(); @@ -231,7 +280,12 @@ public class LocalBlockRunner { }); } - latch.await(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException("Thread intentionally interrupted", e); + } + if (exception.get() != null) { throw new RuntimeException("Worker failed", exception.get()); } @@ -244,4 +298,16 @@ public class LocalBlockRunner { workerContextLogic.postApplication(); } + + private static + <I extends WritableComparable, E extends Writable, V extends Writable> + VertexSaver<I, V, E> noOpVertexSaver() { + return new VertexSaver<I, V, E>() { + @Override + public void saveVertex(Vertex<I, V, E> vertex) { + // No-op + } + }; + } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java index 3b87372..4892a33 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java @@ -49,14 +49,26 @@ public class BlockMasterLogic<S> { private BlockWorkerPieces previousWorkerPieces; private boolean computationDone; + /** + * Initialize master logic to execute BlockFactory defined in + * the configuration. + */ public void initialize( - GiraphConfiguration conf, final BlockMasterApi masterApi) - throws InstantiationException, IllegalAccessException { + GiraphConfiguration conf, final BlockMasterApi masterApi) { + BlockFactory<S> factory = BlockUtils.createBlockFactory(conf); + initialize(factory.createBlock(conf), factory.createExecutionStage(conf), + masterApi); + } + + /** + * Initialize Master Logic to execute given block, starting + * with given executionStage. + */ + public void initialize( + Block executionBlock, S executionStage, final BlockMasterApi masterApi) { this.masterApi = masterApi; this.computationDone = false; - BlockFactory<S> factory = BlockUtils.createBlockFactory(conf); - Block executionBlock = factory.createBlock(conf); LOG.info("Executing application - " + executionBlock); // We register all possible aggregators at the beginning @@ -82,8 +94,7 @@ public class BlockMasterLogic<S> { // iterating. So passing piece as null, and initial state as current state, // so that nothing get's executed in first half, and calculateNextState // returns initial state. - previousPiece = new PairedPieceAndStage<>( - null, factory.createExecutionStage(conf)); + previousPiece = new PairedPieceAndStage<>(null, executionStage); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/17355f55/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java new file mode 100644 index 0000000..b77c797 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.NoMessage; +import org.apache.giraph.utils.TestGraph; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.junit.Assert; +import org.junit.Test; +import org.python.google.common.collect.Iterables; + +/** + * Test of barebones of Blocks Framework. + * + * Do not look as an example of unit test, or to learn about the Framework, + * there are utilities to do things simpler, that we are not trying to test + * here. + */ +public class BlockExecutionTest { + + private static GiraphConfiguration createConf() { + GiraphConfiguration conf = new GiraphConfiguration(); + GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class); + GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class); + GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class); + return conf; + } + + private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph( + GiraphConfiguration conf) { + TestGraph<LongWritable, LongWritable, NullWritable> graph = + new TestGraph<LongWritable, LongWritable, NullWritable>(conf); + graph.addVertex(new LongWritable(1), new LongWritable()); + graph.addVertex(new LongWritable(2), new LongWritable()); + graph.addVertex(new LongWritable(3), new LongWritable()); + graph.addVertex(new LongWritable(4), new LongWritable()); + + graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get()); + graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get()); + graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get()); + graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get()); + return graph; + } + + @Test + public void testMessageSending() { + GiraphConfiguration conf = createConf(); + TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf); + + LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>() { + @Override + public VertexSender<WritableComparable, LongWritable, Writable> getVertexSender( + final BlockWorkerSendApi<WritableComparable, LongWritable, Writable, BooleanWritable> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<WritableComparable, LongWritable, Writable> vertex) { + workerApi.sendMessageToAllEdges(vertex, new BooleanWritable()); + } + }; + } + + @Override + public VertexReceiver<WritableComparable, LongWritable, Writable, BooleanWritable> + getVertexReceiver(BlockWorkerReceiveApi<WritableComparable> workerApi, + Object executionStage) { + return new InnerVertexReceiver() { + @Override + public void vertexReceive(Vertex<WritableComparable, LongWritable, Writable> vertex, + Iterable<BooleanWritable> messages) { + vertex.getValue().set(Iterables.size(messages)); + } + }; + } + + @Override + protected Class<BooleanWritable> getMessageClass() { + return BooleanWritable.class; + } + }, new Object(), conf); + + Assert.assertEquals(1, graph.getVertex(new LongWritable(1)).getValue().get()); + Assert.assertEquals(2, graph.getVertex(new LongWritable(2)).getValue().get()); + Assert.assertEquals(1, graph.getVertex(new LongWritable(3)).getValue().get()); + Assert.assertEquals(0, graph.getVertex(new LongWritable(4)).getValue().get()); + } + + @Test + public void testReducing() { + GiraphConfiguration conf = createConf(); + TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf); + + final LongWritable value = new LongWritable(); + + LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, Writable, Writable, NoMessage, Object>() { + private ReducerHandle<LongWritable, LongWritable> numVertices; + + @Override + public void registerReducers(CreateReducersApi reduceApi, Object executionStage) { + numVertices = reduceApi.createLocalReducer(SumReduce.LONG); + } + + @Override + public VertexSender<WritableComparable, Writable, Writable> getVertexSender( + BlockWorkerSendApi<WritableComparable, Writable, Writable, NoMessage> workerApi, + Object executionStage) { + + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<WritableComparable, Writable, Writable> vertex) { + numVertices.reduce(new LongWritable(1)); + } + }; + } + + @Override + public void masterCompute(BlockMasterApi masterApi, Object executionStage) { + value.set(numVertices.getReducedValue(masterApi).get()); + } + }, new Object(), conf); + + Assert.assertEquals(4, value.get()); + } +}
