Repository: giraph Updated Branches: refs/heads/trunk 572ca0630 -> ffed230ba
[GIRAPH 1013] Apply @edunov fix for block output Summary: Apply fix: https://phabricator.fb.com/D2141200 Test Plan: mvn clean install Reviewers: maja.kabiljo, sergey.edunov, dionysis.logothetis Reviewed By: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D40395 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ffed230b Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ffed230b Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ffed230b Branch: refs/heads/trunk Commit: ffed230ba8958ba3bc27310b71b0c6df589c3e63 Parents: 572ca06 Author: Igor Kabiljo <[email protected]> Authored: Thu Jun 18 16:52:22 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Mon Jun 22 17:01:52 2015 -0700 ---------------------------------------------------------------------- .../api/giraph/BlockMasterCompute.java | 3 +-- .../framework/output/BlockOutputHandle.java | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/ffed230b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java index 69cf9f8..1a8d54d 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java @@ -66,7 +66,6 @@ public final class BlockMasterCompute<S> extends MasterCompute { object.readFields(in); blockMasterLogic = object.get(); blockMasterLogic.initializeAfterRead(new BlockMasterApiWrapper(this, - new BlockOutputHandle(getContext().getJobID().toString(), - getConf(), getContext()))); + new BlockOutputHandle())); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/ffed230b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java index fd38520..76fd768 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputHandle.java @@ -38,15 +38,19 @@ public class BlockOutputHandle implements BlockOutputApi { private transient Configuration conf; private transient Progressable progressable; private final Map<String, BlockOutputDesc> outputDescMap; - private final Map<String, Queue<BlockOutputWriter>> freeWriters; - private final Map<String, Queue<BlockOutputWriter>> occupiedWriters; + private final Map<String, Queue<BlockOutputWriter>> freeWriters = + new HashMap<>(); + private final Map<String, Queue<BlockOutputWriter>> occupiedWriters = + new HashMap<>(); + + public BlockOutputHandle() { + outputDescMap = null; + } public BlockOutputHandle(String jobIdentifier, Configuration conf, Progressable hadoopProgressable) { outputDescMap = BlockOutputFormat.createInitAndCheckOutputDescsMap( conf, jobIdentifier); - freeWriters = new HashMap<>(); - occupiedWriters = new HashMap<>(); for (String confOption : outputDescMap.keySet()) { freeWriters.put(confOption, new ConcurrentLinkedQueue<BlockOutputWriter>()); @@ -65,11 +69,19 @@ public class BlockOutputHandle implements BlockOutputApi { @Override public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) { + if (outputDescMap == null) { + throw new IllegalArgumentException( + "Output cannot be used with checkpointing"); + } return (OD) outputDescMap.get(confOption); } @Override public <OW extends BlockOutputWriter> OW getWriter(String confOption) { + if (outputDescMap == null) { + throw new IllegalArgumentException( + "Output cannot be used with checkpointing"); + } OW outputWriter = (OW) freeWriters.get(confOption).poll(); if (outputWriter == null) { outputWriter = (OW) outputDescMap.get(confOption).createOutputWriter(
