Updated Branches: refs/heads/trunk b88292dbf -> d8baf4b80
GIRAPH-656: Input from multiple tables doesn't work with multithreading (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d8baf4b8 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d8baf4b8 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d8baf4b8 Branch: refs/heads/trunk Commit: d8baf4b80b4c98073bfd82d29ccf46b11847ee4e Parents: b88292d Author: Maja Kabiljo <[email protected]> Authored: Mon May 6 13:27:27 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon May 6 13:28:16 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 + .../io/formats/multi/MultiEdgeInputFormat.java | 37 +++++- .../io/formats/multi/MultiVertexInputFormat.java | 37 +++++- .../giraph/io/internal/WrappedEdgeInputFormat.java | 48 +------- .../giraph/io/internal/WrappedEdgeReader.java | 96 +++++++++++++++ .../io/internal/WrappedVertexInputFormat.java | 41 +------ .../giraph/io/internal/WrappedVertexReader.java | 93 ++++++++++++++ 7 files changed, 255 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 8a2b9eb..ea10fac 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.0.1 - unreleased + GIRAPH-656: Input from multiple tables doesn't work with multithreading + (majakabiljo) + GIRAPH-657: Remove unused reuseIncomingEdgeObjects option (apresta) GIRAPH-592: YourKit profiler (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java index 113b3bc..c377fbc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiEdgeInputFormat.java @@ -21,6 +21,7 @@ package org.apache.giraph.io.formats.multi; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.internal.WrappedEdgeReader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -61,11 +62,29 @@ public class MultiEdgeInputFormat<I extends WritableComparable, public EdgeReader<I, E> createEdgeReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException { if (inputSplit instanceof InputSplitWithInputFormatIndex) { - InputSplitWithInputFormatIndex split = - (InputSplitWithInputFormatIndex) inputSplit; - EdgeInputFormat<I, E> edgeInputFormat = - edgeInputFormats.get(split.getInputFormatIndex()); - return edgeInputFormat.createEdgeReader(split.getSplit(), context); + // When multithreaded input is used we need to make sure other threads + // don't change context's configuration while we use it + synchronized (context) { + InputSplitWithInputFormatIndex split = + (InputSplitWithInputFormatIndex) inputSplit; + EdgeInputFormat<I, E> edgeInputFormat = + edgeInputFormats.get(split.getInputFormatIndex()); + EdgeReader<I, E> edgeReader = + edgeInputFormat.createEdgeReader(split.getSplit(), context); + return new WrappedEdgeReader<I, E>( + edgeReader, edgeInputFormat.getConf()) { + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, + InterruptedException { + // When multithreaded input is used we need to make sure other + // threads don't change context's configuration while we use it + synchronized (context) { + super.initialize(inputSplit, context); + } + } + }; + } } else { throw new IllegalStateException("createEdgeReader: Got InputSplit which" + " was not created by this class: " + inputSplit.getClass().getName()); @@ -75,8 +94,12 @@ public class MultiEdgeInputFormat<I extends WritableComparable, @Override public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { - return - MultiInputUtils.getSplits(context, minSplitCountHint, edgeInputFormats); + // When multithreaded input is used we need to make sure other threads don't + // change context's configuration while we use it + synchronized (context) { + return MultiInputUtils.getSplits( + context, minSplitCountHint, edgeInputFormats); + } } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java index 631a451..72929d9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/multi/MultiVertexInputFormat.java @@ -21,6 +21,7 @@ package org.apache.giraph.io.formats.multi; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; +import org.apache.giraph.io.internal.WrappedVertexReader; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -62,11 +63,29 @@ public class MultiVertexInputFormat<I extends WritableComparable, public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException { if (inputSplit instanceof InputSplitWithInputFormatIndex) { - InputSplitWithInputFormatIndex split = - (InputSplitWithInputFormatIndex) inputSplit; - VertexInputFormat<I, V, E> vertexInputFormat = - vertexInputFormats.get(split.getInputFormatIndex()); - return vertexInputFormat.createVertexReader(split.getSplit(), context); + // When multithreaded input is used we need to make sure other threads + // don't change context's configuration while we use it + synchronized (context) { + InputSplitWithInputFormatIndex split = + (InputSplitWithInputFormatIndex) inputSplit; + VertexInputFormat<I, V, E> vertexInputFormat = + vertexInputFormats.get(split.getInputFormatIndex()); + VertexReader<I, V, E> vertexReader = + vertexInputFormat.createVertexReader(split.getSplit(), context); + return new WrappedVertexReader<I, V, E>( + vertexReader, vertexInputFormat.getConf()) { + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, + InterruptedException { + // When multithreaded input is used we need to make sure other + // threads don't change context's configuration while we use it + synchronized (context) { + super.initialize(inputSplit, context); + } + } + }; + } } else { throw new IllegalStateException("createVertexReader: Got InputSplit " + "which was not created by this class: " + @@ -77,8 +96,12 @@ public class MultiVertexInputFormat<I extends WritableComparable, @Override public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { - return MultiInputUtils.getSplits(context, minSplitCountHint, - vertexInputFormats); + // When multithreaded input is used we need to make sure other threads don't + // change context's configuration while we use it + synchronized (context) { + return MultiInputUtils.getSplits( + context, minSplitCountHint, vertexInputFormats); + } } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java index 928b975..9c209dd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeInputFormat.java @@ -18,8 +18,6 @@ package org.apache.giraph.io.internal; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.edge.Edge; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; import org.apache.hadoop.io.Writable; @@ -70,51 +68,9 @@ public class WrappedEdgeInputFormat<I extends WritableComparable, public EdgeReader<I, E> createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException { getConf().updateConfiguration(context.getConfiguration()); - final EdgeReader<I, E> edgeReader = + EdgeReader<I, E> edgeReader = originalInputFormat.createEdgeReader(split, context); - return new EdgeReader<I, E>() { - @Override - public void setConf( - ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) { - WrappedEdgeInputFormat.this.getConf().updateConfiguration(conf); - super.setConf(conf); - edgeReader.setConf(conf); - } - - @Override - public void initialize(InputSplit inputSplit, - TaskAttemptContext context) throws IOException, InterruptedException { - WrappedEdgeInputFormat.this.getConf().updateConfiguration( - context.getConfiguration()); - edgeReader.initialize(inputSplit, context); - } - - @Override - public boolean nextEdge() throws IOException, InterruptedException { - return edgeReader.nextEdge(); - } - - @Override - public I getCurrentSourceId() throws IOException, InterruptedException { - return edgeReader.getCurrentSourceId(); - } - - @Override - public Edge<I, E> getCurrentEdge() throws IOException, - InterruptedException { - return edgeReader.getCurrentEdge(); - } - - @Override - public void close() throws IOException { - edgeReader.close(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return edgeReader.getProgress(); - } - }; + return new WrappedEdgeReader<I, E>(edgeReader, getConf()); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java new file mode 100644 index 0000000..c0a2cd1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.internal; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.EdgeReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * For internal use only. + * + * Wraps {@link EdgeReader} to make sure proper configuration + * parameters are passed around, that parameters set in original + * configuration are available in methods of this reader + * + * @param <I> Vertex id + * @param <E> Edge data + */ +public class WrappedEdgeReader<I extends WritableComparable, + E extends Writable> extends EdgeReader<I, E> { + /** EdgeReader to delegate the methods to */ + private final EdgeReader<I, E> baseEdgeReader; + + /** + * Constructor + * + * @param baseEdgeReader EdgeReader to delegate all the methods to + * @param conf Configuration + */ + public WrappedEdgeReader(EdgeReader<I, E> baseEdgeReader, + ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) { + this.baseEdgeReader = baseEdgeReader; + super.setConf(conf); + baseEdgeReader.setConf(conf); + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration<I, Writable, E, Writable> conf) { + // We don't want to use external configuration + } + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, InterruptedException { + getConf().updateConfiguration(context.getConfiguration()); + baseEdgeReader.initialize(inputSplit, context); + } + + @Override + public boolean nextEdge() throws IOException, InterruptedException { + return baseEdgeReader.nextEdge(); + } + + @Override + public I getCurrentSourceId() throws IOException, InterruptedException { + return baseEdgeReader.getCurrentSourceId(); + } + + @Override + public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException { + return baseEdgeReader.getCurrentEdge(); + } + + @Override + public void close() throws IOException { + baseEdgeReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return baseEdgeReader.getProgress(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java index ed606e3..f5379c1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexInputFormat.java @@ -18,8 +18,6 @@ package org.apache.giraph.io.internal; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; import org.apache.hadoop.io.Writable; @@ -74,44 +72,7 @@ public class WrappedVertexInputFormat<I extends WritableComparable, getConf().updateConfiguration(context.getConfiguration()); final VertexReader<I, V, E> vertexReader = originalInputFormat.createVertexReader(split, context); - return new VertexReader<I, V, E>() { - @Override - public void setConf( - ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) { - WrappedVertexInputFormat.this.getConf().updateConfiguration(conf); - super.setConf(conf); - vertexReader.setConf(conf); - } - - @Override - public void initialize(InputSplit inputSplit, - TaskAttemptContext context) throws IOException, InterruptedException { - WrappedVertexInputFormat.this.getConf().updateConfiguration( - context.getConfiguration()); - vertexReader.initialize(inputSplit, context); - } - - @Override - public boolean nextVertex() throws IOException, InterruptedException { - return vertexReader.nextVertex(); - } - - @Override - public Vertex<I, V, E, ?> getCurrentVertex() throws IOException, - InterruptedException { - return vertexReader.getCurrentVertex(); - } - - @Override - public void close() throws IOException { - vertexReader.close(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return vertexReader.getProgress(); - } - }; + return new WrappedVertexReader<I, V, E>(vertexReader, getConf()); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/d8baf4b8/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java new file mode 100644 index 0000000..3a8ac50 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.internal; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.VertexReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; + +/** + * For internal use only. + * + * Wraps {@link VertexReader} to make sure proper configuration + * parameters are passed around, that parameters set in original + * configuration are available in methods of this reader + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +public class WrappedVertexReader<I extends WritableComparable, + V extends Writable, E extends Writable> extends VertexReader<I, V, E> { + /** VertexReader to delegate the methods to */ + private final VertexReader<I, V, E> baseVertexReader; + + /** + * Constructor + * + * @param baseVertexReader VertexReader to delegate all the methods to + * @param conf Configuration + */ + public WrappedVertexReader(VertexReader<I, V, E> baseVertexReader, + ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) { + this.baseVertexReader = baseVertexReader; + super.setConf(conf); + baseVertexReader.setConf(conf); + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration<I, V, E, Writable> conf) { + // We don't want to use external configuration + } + + @Override + public void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException, InterruptedException { + getConf().updateConfiguration(context.getConfiguration()); + baseVertexReader.initialize(inputSplit, context); + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return baseVertexReader.nextVertex(); + } + + @Override + public Vertex<I, V, E, ?> getCurrentVertex() throws IOException, + InterruptedException { + return baseVertexReader.getCurrentVertex(); + } + + @Override + public void close() throws IOException { + baseVertexReader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return baseVertexReader.getProgress(); + } +}
