Repository: giraph Updated Branches: refs/heads/trunk 0b1962253 -> 1c5544248
Block API handle Summary: - Some apps need a reference to the Block API objects (e.g. BlockOutputApi) before they are actually executed. See documentation of `BlockApiHandle` for more details. - Also, made I `BlockWorkerApi` implement the `BlockOutputApi` as opposed to only the `BlockWorkerReceiveApi` so that output is possible inside the sender. too. Test Plan: - `mvn install` - internal app that uses the api handle from the master and from the workers - internal snapshot tests Reviewers: maja.kabiljo, sergey.edunov, ikabiljo Reviewed By: ikabiljo Differential Revision: https://reviews.facebook.net/D57939 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1c554424 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1c554424 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1c554424 Branch: refs/heads/trunk Commit: 1c554424859377458fe3d808692122403001304f Parents: 0b19622 Author: Dionysios Logothetis <[email protected]> Authored: Tue May 10 10:41:40 2016 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Tue May 10 10:41:40 2016 -0700 ---------------------------------------------------------------------- .../giraph/block_app/framework/BlockUtils.java | 2 +- .../block_app/framework/api/BlockApiHandle.java | 118 +++++++++++ .../block_app/framework/api/BlockWorkerApi.java | 2 +- .../framework/api/BlockWorkerReceiveApi.java | 2 +- .../framework/block/BlockWithApiHandle.java | 28 +++ .../framework/internal/BlockMasterLogic.java | 16 +- .../internal/BlockWorkerContextLogic.java | 2 + .../framework/internal/BlockWorkerLogic.java | 2 + .../framework/internal/BlockWorkerPieces.java | 10 +- .../block_app/framework/BlockApiHandleTest.java | 211 +++++++++++++++++++ 10 files changed, 387 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java index 6bf6d92..49f23c5 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java @@ -52,7 +52,7 @@ public class BlockUtils { ClassConfOption.create("digraph.block_factory", null, BlockFactory.class, "block factory describing giraph job"); - /** Property describing BlockFactory to use for current application run */ + /** Property describing block worker context value class to use */ public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS = ClassConfOption.create( "digraph.block_worker_context_value_class", http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApiHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApiHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApiHandle.java new file mode 100644 index 0000000..4c52826 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApiHandle.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Class that contains references to Block Api objects. + * + * One general use-case for this is for applications to indirectly get a handle + * on the Block Api objects and implement operations that (i) depend on the + * Block Api interfaces, (ii) are not in the context of a Piece when defined, + * and (iii) are in the context of a Piece when executed. + * + * To do this, as opposed to defining an application as a {@link Block}, define + * your application as a {@link BlockWithApiHandle}. + * + * NOTE: Depending on the context in which this class is used, some of the + * handles may not be set. For instance, the {@link masterApi} is not set when + * this is in the context of a worker. Trying to get access to a handle when + * it is not set will result in a runtime exception. Instead, you should first + * use methods like the {@link #isMasterApiSet()} to check. + * + * The *Api fields are transient as we do not need/want to serialize them. They + * will be set at the appropriate time by the framework. + */ +public class BlockApiHandle { + private transient BlockMasterApi masterApi; + private transient BlockWorkerReceiveApi workerReceiveApi; + private transient BlockWorkerSendApi workerSendApi; + private transient BlockWorkerContextReceiveApi workerContextReceiveApi; + private transient BlockWorkerContextSendApi workerContextSendApi; + + public void setMasterApi(BlockMasterApi api) { + this.masterApi = api; + } + + public void setWorkerReceiveApi(BlockWorkerReceiveApi api) { + this.workerReceiveApi = api; + } + + public void setWorkerSendApi(BlockWorkerSendApi api) { + this.workerSendApi = api; + } + + public void setWorkerContextReceiveApi(BlockWorkerContextReceiveApi api) { + this.workerContextReceiveApi = api; + } + + public void setWorkerContextSendApi(BlockWorkerContextSendApi api) { + this.workerContextSendApi = api; + } + + public boolean isMasterApiSet() { + return masterApi != null; + } + + public boolean isWorkerReceiveApiSet() { + return workerReceiveApi != null; + } + + public boolean isWorkerSendApiSet() { + return workerSendApi != null; + } + + public boolean isWorkerContextReceiveApiSet() { + return workerContextReceiveApi != null; + } + + public boolean isWorkerContextSendApiSet() { + return workerContextSendApi != null; + } + + public BlockMasterApi getMasterApi() { + checkNotNull(masterApi, + "BlockMasterApi not valid in this context."); + return masterApi; + } + + public BlockWorkerReceiveApi getWorkerReceiveApi() { + checkNotNull(workerReceiveApi, + "BlockWorkerReceiveApi not valid in this context."); + return workerReceiveApi; + } + + public BlockWorkerSendApi getWorkerSendApi() { + checkNotNull(workerSendApi, + "BlockWorkerSendApi not valid in this context."); + return workerSendApi; + } + + public BlockWorkerContextReceiveApi getWorkerContextReceiveApi() { + checkNotNull(workerContextReceiveApi, + "BlockWorkerContextReceiveApi not valid in this context"); + return workerContextReceiveApi; + } + + public BlockWorkerContextSendApi getWorkerContextSendApi() { + checkNotNull(workerContextSendApi, + "BlockWorkerContextSendApi not valid in this context"); + return workerContextSendApi; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java index 76898f6..f6b3551 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java @@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableComparable; */ @SuppressWarnings("rawtypes") public interface BlockWorkerApi<I extends WritableComparable> - extends BlockApi, AggregatorUsage, WorkerIndexUsage<I> { + extends BlockApi, BlockOutputApi, AggregatorUsage, WorkerIndexUsage<I> { @Override ImmutableClassesGiraphConfiguration<I, ?, ?> getConf(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java index 6db51bd..b99d319 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java @@ -29,5 +29,5 @@ import org.apache.hadoop.io.WritableComparable; */ @SuppressWarnings("rawtypes") public interface BlockWorkerReceiveApi<I extends WritableComparable> - extends BlockWorkerApi<I>, WorkerBroadcastUsage, BlockOutputApi { + extends BlockWorkerApi<I>, WorkerBroadcastUsage { } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/BlockWithApiHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/BlockWithApiHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/BlockWithApiHandle.java new file mode 100644 index 0000000..f2dad87 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/BlockWithApiHandle.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import org.apache.giraph.block_app.framework.api.BlockApiHandle; + +/** + * Applications that need access to a {@link BlockApiHandle} should return a + * {@link Block} of this type. + */ +public interface BlockWithApiHandle extends Block { + BlockApiHandle getBlockApiHandle(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java index bd86c21..b6167d9 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java @@ -26,9 +26,11 @@ import java.util.TreeMap; import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.giraph.block_app.framework.BlockFactory; import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.api.BlockApiHandle; import org.apache.giraph.block_app.framework.api.BlockMasterApi; import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor; import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.BlockWithApiHandle; import org.apache.giraph.block_app.framework.piece.AbstractPiece; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.function.Consumer; @@ -53,6 +55,7 @@ public class BlockMasterLogic<S> { private long lastTimestamp = -1; private BlockWorkerPieces previousWorkerPieces; private boolean computationDone; + private BlockApiHandle blockApiHandle; /** Tracks elapsed time on master for each distinct Piece */ private final TimeStatsPerEvent masterPerPieceTimeStats = @@ -82,6 +85,14 @@ public class BlockMasterLogic<S> { this.computationDone = false; LOG.info("Executing application - " + executionBlock); + if (executionBlock instanceof BlockWithApiHandle) { + blockApiHandle = + ((BlockWithApiHandle) executionBlock).getBlockApiHandle(); + } + if (blockApiHandle == null) { + blockApiHandle = new BlockApiHandle(); + } + blockApiHandle.setMasterApi(masterApi); // We register all possible aggregators at the beginning executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() { @@ -89,7 +100,7 @@ public class BlockMasterLogic<S> { @SuppressWarnings("deprecation") @Override public void apply(AbstractPiece piece) { - // no need to regiser the same piece twice. + // no need to register the same piece twice. if (registeredPieces.add(piece)) { try { piece.registerAggregators(masterApi); @@ -178,7 +189,8 @@ public class BlockMasterLogic<S> { postApplication(); result = null; } else { - result = new BlockWorkerPieces<>(previousPiece, nextPiece); + result = new BlockWorkerPieces<>( + previousPiece, nextPiece, blockApiHandle); if (logExecutionStatus) { LOG.info("Master in " + superstep + " superstep passing " + result + " to be executed"); http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java index ca2bb5a..a58f78b 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java @@ -65,6 +65,8 @@ public class BlockWorkerContextLogic { BlockWorkerContextSendApi sendApi, BlockWorkerPieces workerPieces, long superstep, List<Writable> messages) { + workerPieces.getBlockApiHandle().setWorkerContextReceiveApi(receiveApi); + workerPieces.getBlockApiHandle().setWorkerContextSendApi(sendApi); if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) { LOG.info("Worker executing " + workerPieces + " in " + superstep + " superstep"); http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java index 844160c..d4f6c3f 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java @@ -40,6 +40,8 @@ public class BlockWorkerLogic { public void preSuperstep( BlockWorkerReceiveApi receiveApi, BlockWorkerSendApi sendApi) { + pieces.getBlockApiHandle().setWorkerReceiveApi(receiveApi); + pieces.getBlockApiHandle().setWorkerSendApi(sendApi); if (pieces.getReceiver() != null) { receiveFunctions = pieces.getReceiver().getVertexReceiver(receiveApi); } http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java index 3b38cfa..545237d 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Objects; +import org.apache.giraph.block_app.framework.api.BlockApiHandle; import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; import org.apache.giraph.conf.DefaultMessageClasses; import org.apache.giraph.conf.GiraphConstants; @@ -52,11 +53,14 @@ public class BlockWorkerPieces<S> { private final PairedPieceAndStage<S> receiver; private final PairedPieceAndStage<S> sender; + private final BlockApiHandle blockApiHandle; public BlockWorkerPieces( - PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender) { + PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender, + BlockApiHandle blockApiHandle) { this.receiver = receiver; this.sender = sender; + this.blockApiHandle = blockApiHandle; } public PairedPieceAndStage<S> getReceiver() { @@ -67,6 +71,10 @@ public class BlockWorkerPieces<S> { return sender; } + public BlockApiHandle getBlockApiHandle() { + return blockApiHandle; + } + public MessageClasses getOutgoingMessageClasses( ImmutableClassesGiraphConfiguration conf) { MessageClasses messageClasses; http://git-wip-us.apache.org/repos/asf/giraph/blob/1c554424/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java new file mode 100644 index 0000000..328b45d --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockApiHandleTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import org.apache.giraph.block_app.framework.api.BlockApiHandle; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.BlockWithApiHandle; +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.DefaultParentPiece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.TestGraph; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** + * Test the use of {@link BlockApiHandle}. + */ +public class BlockApiHandleTest { + + private static GiraphConfiguration createConf() { + GiraphConfiguration conf = new GiraphConfiguration(); + GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class); + GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class); + GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class); + return conf; + } + + private static TestGraph<LongWritable, LongWritable, NullWritable> + createTestGraph() { + TestGraph<LongWritable, LongWritable, NullWritable> graph = + new TestGraph<>(createConf()); + graph.addVertex(new LongWritable(1), new LongWritable()); + graph.addVertex(new LongWritable(2), new LongWritable()); + graph.addVertex(new LongWritable(3), new LongWritable()); + graph.addVertex(new LongWritable(4), new LongWritable()); + graph.addEdge(new LongWritable(1), new LongWritable(2), NullWritable.get()); + graph.addEdge(new LongWritable(2), new LongWritable(1), NullWritable.get()); + graph.addEdge(new LongWritable(2), new LongWritable(3), NullWritable.get()); + graph.addEdge(new LongWritable(3), new LongWritable(2), NullWritable.get()); + return graph; + } + + public static class DummyObjectWithApiHandle { + + private BlockApiHandle handle; + + public DummyObjectWithApiHandle(BlockApiHandle handle) { + this.handle = handle; + } + + public void doSomethingAtWorker() { + // checking that the handles have been set + assertFalse(handle.isMasterApiSet()); + assertFalse(handle.isWorkerContextReceiveApiSet()); + assertFalse(handle.isWorkerContextSendApiSet()); + assertEquals(1, handle.getWorkerReceiveApi().getWorkerCount()); + assertEquals(0, handle.getWorkerSendApi().getMyWorkerIndex()); + } + + public void doSomethingAtWorkerContext() { + // checking that the handles have been set + assertFalse(handle.isMasterApiSet()); + assertFalse(handle.isWorkerReceiveApiSet()); + assertFalse(handle.isWorkerSendApiSet()); + assertEquals(1, handle.getWorkerContextReceiveApi().getWorkerCount()); + assertEquals(0, handle.getWorkerContextSendApi().getMyWorkerIndex()); + } + + public void doSomethingAtMaster() { + // checking that the handles have been set + assertEquals(1, handle.getMasterApi().getWorkerCount()); + assertFalse(handle.isWorkerReceiveApiSet()); + assertFalse(handle.isWorkerSendApiSet()); + assertFalse(handle.isWorkerContextReceiveApiSet()); + assertFalse(handle.isWorkerContextSendApiSet()); + } + } + + public static class TestPiece extends DefaultParentPiece<WritableComparable, + LongWritable, Writable, NullWritable, Object, DoubleWritable, Object> { + + private DummyObjectWithApiHandle object; + + public TestPiece(DummyObjectWithApiHandle object) { + this.object = object; + } + + @Override + public VertexSender<WritableComparable, LongWritable, Writable> + getVertexSender(final BlockWorkerSendApi<WritableComparable, LongWritable, + Writable, NullWritable> workerApi, Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend( + Vertex<WritableComparable, LongWritable, Writable> vertex) { + object.doSomethingAtWorker(); + } + }; + } + + @Override + public VertexReceiver<WritableComparable, LongWritable, Writable, + NullWritable> getVertexReceiver( + BlockWorkerReceiveApi<WritableComparable> workerApi, + Object executionStage) { + return new InnerVertexReceiver() { + @Override + public void vertexReceive( + Vertex<WritableComparable, LongWritable, Writable> vertex, + Iterable<NullWritable> messages) { + object.doSomethingAtWorker(); + } + }; + } + + public void workerContextSend(BlockWorkerContextSendApi<WritableComparable, + DoubleWritable> workerContextApi, Object executionStage, + Writable workerValue) { + object.doSomethingAtWorkerContext(); + } + + /** + * Override to have worker context receive computation. + * + * Called once per worker, before all vertices are going to be processed + * with getVertexReceiver. + */ + public void workerContextReceive( + BlockWorkerContextReceiveApi workerContextApi, Object executionStage, + Object workerValue, List<DoubleWritable> workerMessages) { + object.doSomethingAtWorkerContext(); + } + + @Override + public void masterCompute(BlockMasterApi masterApi, Object executionStage) { + object.doSomethingAtMaster(); + } + + @Override + protected Class<NullWritable> getMessageClass() { + return NullWritable.class; + } + } + + @Test + public void testBlockApiHandle() { + TestGraph<LongWritable, LongWritable, NullWritable> graph = + createTestGraph(); + + final BlockApiHandle handle = new BlockApiHandle(); + final DefaultParentPiece piece = + new TestPiece(new DummyObjectWithApiHandle(handle)); + + Block block = new BlockWithApiHandle() { + @Override + public Iterator<AbstractPiece> iterator() { + return piece.iterator(); + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { + piece.forAllPossiblePieces(consumer); + } + + @Override + public BlockApiHandle getBlockApiHandle() { + return handle; + } + }; + + BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.set( + graph.getConf(), Object.class); + LocalBlockRunner.runBlock(graph, block, new Object()); + } +}
