Updated Branches: refs/heads/trunk e495238bb -> afb3ecce1
GIRAPH-560: Input filtering (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/afb3ecce Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/afb3ecce Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/afb3ecce Branch: refs/heads/trunk Commit: afb3ecce139ab7c4037602ad143b5e6424dc48a2 Parents: e495238 Author: Nitay Joffe <[email protected]> Authored: Mon May 6 20:57:19 2013 -0400 Committer: Nitay Joffe <[email protected]> Committed: Mon May 6 21:00:45 2013 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/conf/GiraphClasses.java | 29 ++++ .../apache/giraph/conf/GiraphConfiguration.java | 22 +++ .../org/apache/giraph/conf/GiraphConstants.java | 14 ++ .../conf/ImmutableClassesGiraphConfiguration.java | 40 +++++- .../org/apache/giraph/graph/GraphTaskManager.java | 2 + .../giraph/io/filters/DefaultEdgeInputFilter.java | 39 +++++ .../io/filters/DefaultVertexInputFilter.java | 41 +++++ .../apache/giraph/io/filters/EdgeInputFilter.java | 41 +++++ .../giraph/io/filters/VertexInputFilter.java | 42 +++++ .../org/apache/giraph/io/filters/package-info.java | 21 +++ .../org/apache/giraph/metrics/MetricNames.java | 10 ++ .../giraph/worker/EdgeInputSplitsCallable.java | 42 +++++- .../apache/giraph/worker/InputSplitsCallable.java | 57 +++++++ .../giraph/worker/VertexInputSplitsCallable.java | 34 ++++- .../java/org/apache/giraph/io/TestEdgeInput.java | 30 +--- .../java/org/apache/giraph/io/TestFilters.java | 119 +++++++++++++++ .../giraph/vertices/IntIntNullVertexDoNothing.java | 25 +++ .../apache/giraph/vertices/VertexCountEdges.java | 33 ++++ .../apache/giraph/vertices/VertexDoNothing.java | 33 ++++ 20 files changed, 643 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index b8318b4..aa1ce1e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.0.1 - unreleased + GIRAPH-560: Input filtering (nitay) + GIRAPH-621: Website Documentation: Basic Design Document (aching) GIRAPH-658: Remove final modifier from SimpleHiveToEdge.initializeRecords http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java index 4a0e8f7..10e4975 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java @@ -27,7 +27,11 @@ import org.apache.giraph.graph.DefaultVertexValueFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.graph.VertexValueFactory; +import org.apache.giraph.io.filters.DefaultEdgeInputFilter; +import org.apache.giraph.io.filters.DefaultVertexInputFilter; +import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.master.DefaultMasterCompute; @@ -108,6 +112,12 @@ public class GiraphClasses<I extends WritableComparable, /** Partition class - cached for fast accesss */ protected Class<? extends Partition<I, V, E, M>> partitionClass; + /** Edge Input Filter class */ + protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass; + /** Vertex Input Filter class */ + protected Class<? extends VertexInputFilter<I, V, E, M>> + vertexInputFilterClass; + /** * Empty constructor. Initialize with default classes or null. */ @@ -131,6 +141,10 @@ public class GiraphClasses<I extends WritableComparable, masterComputeClass = DefaultMasterCompute.class; partitionClass = (Class<? extends Partition<I, V, E, M>>) (Object) SimplePartition.class; + edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>) + (Object) DefaultEdgeInputFilter.class; + vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>) + (Object) DefaultVertexInputFilter.class; } /** @@ -185,6 +199,11 @@ public class GiraphClasses<I extends WritableComparable, masterComputeClass = MASTER_COMPUTE_CLASS.get(conf); partitionClass = (Class<? extends Partition<I, V, E, M>>) PARTITION_CLASS.get(conf); + + edgeInputFilterClass = (Class<? extends EdgeInputFilter<I, E>>) + EDGE_INPUT_FILTER_CLASS.get(conf); + vertexInputFilterClass = (Class<? extends VertexInputFilter<I, V, E, M>>) + VERTEX_INPUT_FILTER_CLASS.get(conf); } /** @@ -269,6 +288,16 @@ public class GiraphClasses<I extends WritableComparable, return graphPartitionerFactoryClass; } + public Class<? extends EdgeInputFilter<I, E>> + getEdgeInputFilterClass() { + return edgeInputFilterClass; + } + + public Class<? extends VertexInputFilter<I, V, E, M>> + getVertexInputFilterClass() { + return vertexInputFilterClass; + } + /** * Check if VertexInputFormat class is set * http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 8d74626..754fad9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -28,6 +28,8 @@ import org.apache.giraph.graph.VertexValueFactory; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.filters.EdgeInputFilter; +import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; @@ -106,6 +108,26 @@ public class GiraphConfiguration extends Configuration } /** + * Set the edge input filter class + * + * @param edgeFilterClass class to use + */ + public void setEdgeInputFilterClass( + Class<? extends EdgeInputFilter> edgeFilterClass) { + EDGE_INPUT_FILTER_CLASS.set(this, edgeFilterClass); + } + + /** + * Set the vertex input filter class + * + * @param vertexFilterClass class to use + */ + public void setVertexInputFilterClass( + Class<? extends VertexInputFilter> vertexFilterClass) { + VERTEX_INPUT_FILTER_CLASS.set(this, vertexFilterClass); + } + + /** * Get the vertex edges class * * @return vertex edges class http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 54a40b7..bbf50e5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -30,6 +30,10 @@ import org.apache.giraph.graph.VertexValueFactory; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.filters.DefaultEdgeInputFilter; +import org.apache.giraph.io.filters.DefaultVertexInputFilter; +import org.apache.giraph.io.filters.EdgeInputFilter; +import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.job.DefaultJobObserver; import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.master.DefaultMasterCompute; @@ -120,6 +124,16 @@ public interface GiraphConstants { ClassConfOption.create("giraph.edgeInputFormatClass", null, EdgeInputFormat.class); + /** EdgeInputFilter class */ + ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS = + ClassConfOption.create("giraph.edgeInputFilterClass", + DefaultEdgeInputFilter.class, EdgeInputFilter.class); + + /** VertexInputFilter class */ + ClassConfOption<VertexInputFilter> VERTEX_INPUT_FILTER_CLASS = + ClassConfOption.create("giraph.vertexInputFilterClass", + DefaultVertexInputFilter.class, VertexInputFilter.class); + /** VertexOutputFormat class */ ClassConfOption<VertexOutputFormat> VERTEX_OUTPUT_FORMAT_CLASS = ClassConfOption.create("giraph.vertexOutputFormatClass", null, http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index f992b37..a9add4f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -31,6 +31,8 @@ import org.apache.giraph.graph.VertexValueFactory; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; +import org.apache.giraph.io.filters.EdgeInputFilter; +import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.io.internal.WrappedEdgeInputFormat; import org.apache.giraph.io.internal.WrappedVertexInputFormat; import org.apache.giraph.io.internal.WrappedVertexOutputFormat; @@ -93,7 +95,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, */ public ImmutableClassesGiraphConfiguration(Configuration conf) { super(conf); - classes = new GiraphClasses(conf); + classes = new GiraphClasses<I, V, E, M>(conf); useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this); try { vertexValueFactory = (VertexValueFactory<V>) @@ -138,6 +140,42 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Get the vertex input filter class + * + * @return VertexInputFilter class + */ + public Class<? extends EdgeInputFilter<I, E>> + getEdgeInputFilterClass() { + return classes.getEdgeInputFilterClass(); + } + + /** + * Get the edge input filter to use + * @return EdgeInputFilter + */ + public EdgeInputFilter getEdgeInputFilter() { + return ReflectionUtils.newInstance(getEdgeInputFilterClass(), this); + } + + /** + * Get the vertex input filter class + * + * @return VertexInputFilter class + */ + public Class<? extends VertexInputFilter<I, V, E, M>> + getVertexInputFilterClass() { + return classes.getVertexInputFilterClass(); + } + + /** + * Get the vertex input filter to use + * @return VertexInputFilter + */ + public VertexInputFilter getVertexInputFilter() { + return ReflectionUtils.newInstance(getVertexInputFilterClass(), this); + } + + /** * Get the user's subclassed * {@link org.apache.giraph.partition.GraphPartitionerFactory}. * http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 97cf55d..5dbf977 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -42,6 +42,7 @@ import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.worker.BspServiceWorker; +import org.apache.giraph.worker.InputSplitsCallable; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerObserver; @@ -337,6 +338,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, GiraphMetrics.get().addSuperstepResetObserver(this); initJobMetrics(); MemoryUtils.initMetrics(); + InputSplitsCallable.initMetrics(); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java new file mode 100644 index 0000000..ad52496 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultEdgeInputFilter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.filters; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.edge.Edge; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Default edge filter that lets in all edges. + * + * @param <I> Vertex ID + * @param <E> Edge Value + */ +public class DefaultEdgeInputFilter<I extends WritableComparable, + E extends Writable> + extends DefaultImmutableClassesGiraphConfigurable<I, Writable, E, Writable> + implements EdgeInputFilter<I, E> { + @Override + public boolean dropEdge(I sourceId, Edge<I, E> edge) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java new file mode 100644 index 0000000..2976cbc --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/DefaultVertexInputFilter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.filters; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Default vertex filter that lets in all vertices. + * + * @param <I> Vertex ID + * @param <V> Vertex Value + * @param <E> Edge Value + * @param <M> Message Value + */ +public class DefaultVertexInputFilter<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M> + implements VertexInputFilter<I, V, E, M> { + @Override + public boolean dropVertex(Vertex<I, V, E, M> vertex) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java new file mode 100644 index 0000000..fa69932 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/EdgeInputFilter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.filters; + +import org.apache.giraph.edge.Edge; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Filters edges on input. + * + * @param <I> Vertex ID + * @param <E> Edge Value + */ +public interface EdgeInputFilter<I extends WritableComparable, + E extends Writable> { + /** + * Whether to drop this edge + * + * @param sourceId ID of source of edge + * @param edge to check + * @return true if we should drop the edge + */ + boolean dropEdge(I sourceId, Edge<I, E> edge); +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java new file mode 100644 index 0000000..d9af103 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/VertexInputFilter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.filters; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Filters vertices on input. + * + * @param <I> Vertex ID + * @param <V> Vertex Value + * @param <E> Edge Value + * @param <M> Message Value + */ +public interface VertexInputFilter<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> { + /** + * Whether to drop a vertex on input. + * + * @param vertex to check + * @return true if we should drop vertex + */ + boolean dropVertex(Vertex<I, V, E, M> vertex); +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java b/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java new file mode 100644 index 0000000..a8f901a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/io/filters/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Input/Output filters. + */ +package org.apache.giraph.io.filters; http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java index 52a3d15..cc237ac 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java @@ -78,4 +78,14 @@ public interface MetricNames { /** PercentGauge of memory free */ String MEMORY_FREE_PERCENT = "memory-free-pct"; + + /** Total edges loaded */ + String EDGES_FILTERED = "edges-filtered"; + /** Percent of edges filtered out */ + String EDGES_FILTERED_PCT = "edges-filtered-pct"; + + /** Total vertices filtered */ + String VERTICES_FILTERED = "vertices-filtered"; + /** Percent of vertices filtered out */ + String VERTICES_FILTERED_PCT = "vertices-filtered-pct"; } http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 83fe5ea..351a114 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -24,6 +24,7 @@ import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.zk.ZooKeeperExt; @@ -34,6 +35,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Meter; import java.io.IOException; @@ -52,7 +54,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends InputSplitsCallable<I, V, E, M> { /** How often to update metrics and print info */ - public static final int VERTICES_UPDATE_PERIOD = 1000000; + public static final int EDGES_UPDATE_PERIOD = 1000000; + /** How often to update filtered metrics */ + public static final int EDGES_FILTERED_UPDATE_PERIOD = 10000; /** Class logger */ private static final Logger LOG = Logger.getLogger( @@ -62,9 +66,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, /** Input split max edges (-1 denotes all) */ private final long inputSplitMaxEdges; + /** Filter to use */ + private final EdgeInputFilter<I, E> edgeInputFilter; + // Metrics /** edges loaded meter across all readers */ private final Meter totalEdgesMeter; + /** edges filtered out by user */ + private final Counter totalEdgesFiltered; /** * Constructor. @@ -90,9 +99,11 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, this.edgeInputFormat = edgeInputFormat; inputSplitMaxEdges = configuration.getInputSplitMaxEdges(); + edgeInputFilter = configuration.getEdgeInputFilter(); // Initialize Metrics totalEdgesMeter = getTotalEdgesLoadedMeter(); + totalEdgesFiltered = getTotalEdgesFilteredCounter(); } @Override @@ -121,7 +132,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, (ImmutableClassesGiraphConfiguration<I, Writable, E, Writable>) configuration); edgeReader.initialize(inputSplit, context); + long inputSplitEdgesLoaded = 0; + long inputSplitEdgesFiltered = 0; + while (edgeReader.nextEdge()) { I sourceId = edgeReader.getCurrentSourceId(); Edge<I, E> readerEdge = edgeReader.getCurrentEdge(); @@ -141,14 +155,24 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, "without a value! - " + readerEdge); } - graphState.getWorkerClientRequestProcessor().sendEdgeRequest( - sourceId, readerEdge); - context.progress(); // do this before potential data transfer ++inputSplitEdgesLoaded; - // Update status every VERTICES_UPDATE_PERIOD edges - if (inputSplitEdgesLoaded % VERTICES_UPDATE_PERIOD == 0) { - totalEdgesMeter.mark(VERTICES_UPDATE_PERIOD); + if (edgeInputFilter.dropEdge(sourceId, readerEdge)) { + ++inputSplitEdgesFiltered; + if (inputSplitEdgesFiltered % EDGES_FILTERED_UPDATE_PERIOD == 0) { + totalEdgesFiltered.inc(inputSplitEdgesFiltered); + inputSplitEdgesFiltered = 0; + } + continue; + } + + graphState.getWorkerClientRequestProcessor().sendEdgeRequest(sourceId, + readerEdge); + context.progress(); // do this before potential data transfer + + // Update status every EDGES_UPDATE_PERIOD edges + if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) { + totalEdgesMeter.mark(EDGES_UPDATE_PERIOD); LoggerUtils.setStatusAndLog(context, LOG, Level.INFO, "readEdgeInputSplit: Loaded " + totalEdgesMeter.count() + " edges at " + @@ -169,6 +193,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, } } edgeReader.close(); + + totalEdgesFiltered.inc(inputSplitEdgesFiltered); + totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD); + return new VertexEdgeCount(0, inputSplitEdgesLoaded); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index f7a8340..a8298c5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -25,7 +25,9 @@ import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.GiraphMetricsRegistry; import org.apache.giraph.metrics.MeterDesc; +import org.apache.giraph.metrics.MetricNames; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; @@ -38,7 +40,9 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; +import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Meter; +import com.yammer.metrics.util.PercentGauge; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -135,6 +139,16 @@ public abstract class InputSplitsCallable<I extends WritableComparable, } /** + * Get Counter tracking edges filtered + * + * @return Counter tracking edges filtered + */ + public static Counter getTotalEdgesFilteredCounter() { + return GiraphMetrics.get().perJobRequired() + .getCounter(MetricNames.EDGES_FILTERED); + } + + /** * Get Meter tracking number of vertices loaded. * * @return Meter for vertices loaded @@ -145,6 +159,49 @@ public abstract class InputSplitsCallable<I extends WritableComparable, } /** + * Get Counter tracking vertices filtered + * + * @return Counter tracking vertices filtered + */ + public static Counter getTotalVerticesFilteredCounter() { + return GiraphMetrics.get().perJobRequired() + .getCounter(MetricNames.VERTICES_FILTERED); + } + + /** + * Initialize metrics used by this class and its subclasses. + */ + public static void initMetrics() { + GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired(); + + final Counter edgesFiltered = getTotalEdgesFilteredCounter(); + final Meter edgesLoaded = getTotalEdgesLoadedMeter(); + + metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() { + @Override protected double getNumerator() { + return edgesFiltered.count(); + } + + @Override protected double getDenominator() { + return edgesLoaded.count(); + } + }); + + final Counter verticesFiltered = getTotalVerticesFilteredCounter(); + final Meter verticesLoaded = getTotalVerticesLoadedMeter(); + + metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() { + @Override protected double getNumerator() { + return verticesFiltered.count(); + } + + @Override protected double getDenominator() { + return verticesLoaded.count(); + } + }); + } + + /** * Load vertices/edges from the given input split. * * @param inputSplit Input split to load http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index d32ccaf..1c292ad 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -20,14 +20,15 @@ package org.apache.giraph.worker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; +import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; -import org.apache.giraph.graph.Vertex; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Meter; import java.io.IOException; @@ -55,6 +57,9 @@ public class VertexInputSplitsCallable<I extends WritableComparable, extends InputSplitsCallable<I, V, E, M> { /** How often to update metrics and print info */ public static final int VERTICES_UPDATE_PERIOD = 250000; + /** How often to update filtered out metrics */ + public static final int VERTICES_FILTERED_UPDATE_PERIOD = 2500; + /** Class logger */ private static final Logger LOG = Logger.getLogger(VertexInputSplitsCallable.class); @@ -64,10 +69,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable, private final long inputSplitMaxVertices; /** Bsp service worker (only use thread-safe methods) */ private final BspServiceWorker<I, V, E, M> bspServiceWorker; + /** Filter to select which vertices to keep */ + private final VertexInputFilter<I, V, E, M> vertexInputFilter; // Metrics /** number of vertices loaded meter across all readers */ private final Meter totalVerticesMeter; + /** number of vertices filtered out */ + private final Counter totalVerticesFilteredCounter; /** number of edges loaded meter across all readers */ private final Meter totalEdgesMeter; @@ -96,9 +105,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable, inputSplitMaxVertices = configuration.getInputSplitMaxVertices(); this.bspServiceWorker = bspServiceWorker; + vertexInputFilter = configuration.getVertexInputFilter(); // Initialize Metrics totalVerticesMeter = getTotalVerticesLoadedMeter(); + totalVerticesFilteredCounter = getTotalVerticesFilteredCounter(); totalEdgesMeter = getTotalEdgesLoadedMeter(); } @@ -127,9 +138,13 @@ public class VertexInputSplitsCallable<I extends WritableComparable, vertexReader.setConf( (ImmutableClassesGiraphConfiguration<I, V, E, Writable>) configuration); vertexReader.initialize(inputSplit, context); + long inputSplitVerticesLoaded = 0; + long inputSplitVerticesFiltered = 0; + long edgesSinceLastUpdate = 0; long inputSplitEdgesLoaded = 0; + while (vertexReader.nextVertex()) { Vertex<I, V, E, M> readerVertex = (Vertex<I, V, E, M>) vertexReader.getCurrentVertex(); @@ -144,12 +159,22 @@ public class VertexInputSplitsCallable<I extends WritableComparable, readerVertex.setConf(configuration); readerVertex.setGraphState(graphState); + ++inputSplitVerticesLoaded; + + if (vertexInputFilter.dropVertex(readerVertex)) { + ++inputSplitVerticesFiltered; + if (inputSplitVerticesFiltered % VERTICES_FILTERED_UPDATE_PERIOD == 0) { + totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered); + inputSplitVerticesFiltered = 0; + } + continue; + } + PartitionOwner partitionOwner = bspServiceWorker.getVertexPartitionOwner(readerVertex.getId()); graphState.getWorkerClientRequestProcessor().sendVertexRequest( partitionOwner, readerVertex); context.progress(); // do this before potential data transfer - ++inputSplitVerticesLoaded; edgesSinceLastUpdate += readerVertex.getNumEdges(); // Update status every VERTICES_UPDATE_PERIOD vertices @@ -181,6 +206,11 @@ public class VertexInputSplitsCallable<I extends WritableComparable, break; } } + + totalVerticesMeter.mark(inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD); + totalEdgesMeter.mark(edgesSinceLastUpdate); + totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered); + vertexReader.close(); return new VertexEdgeCount(inputSplitVerticesLoaded, inputSplitEdgesLoaded + edgesSinceLastUpdate); http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java index 07d4cc8..cb1a8da 100644 --- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java @@ -23,13 +23,14 @@ import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.ByteArrayEdges; import org.apache.giraph.edge.Edge; -import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexValueFactory; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat; import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat; import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.giraph.vertices.IntIntNullVertexDoNothing; +import org.apache.giraph.vertices.VertexCountEdges; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.junit.Test; @@ -64,7 +65,7 @@ public class TestEdgeInput extends BspCase { }; GiraphConfiguration conf = new GiraphConfiguration(); - conf.setVertexClass(TestVertexWithNumEdges.class); + conf.setVertexClass(VertexCountEdges.class); conf.setOutEdgesClass(ByteArrayEdges.class); conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); @@ -93,7 +94,7 @@ public class TestEdgeInput extends BspCase { }; GiraphConfiguration conf = new GiraphConfiguration(); - conf.setVertexClass(TestVertexWithNumEdges.class); + conf.setVertexClass(VertexCountEdges.class); conf.setOutEdgesClass(ByteArrayEdges.class); conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); @@ -129,7 +130,7 @@ public class TestEdgeInput extends BspCase { }; GiraphConfiguration conf = new GiraphConfiguration(); - conf.setVertexClass(TestVertexDoNothing.class); + conf.setVertexClass(IntIntNullVertexDoNothing.class); conf.setOutEdgesClass(ByteArrayEdges.class); conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class); conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); @@ -160,7 +161,7 @@ public class TestEdgeInput extends BspCase { assertEquals(3, (int) values.get(5)); conf = new GiraphConfiguration(); - conf.setVertexClass(TestVertexWithNumEdges.class); + conf.setVertexClass(VertexCountEdges.class); conf.setOutEdgesClass(ByteArrayEdges.class); conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class); conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); @@ -209,16 +210,7 @@ public class TestEdgeInput extends BspCase { assertEquals(0, (int) values.get(4)); } - public static class TestVertexWithNumEdges extends Vertex<IntWritable, - IntWritable, NullWritable, NullWritable> { - @Override - public void compute(Iterable<NullWritable> messages) throws IOException { - setValue(new IntWritable(getNumEdges())); - voteToHalt(); - } - } - - public static class TestVertexCheckEdgesType extends TestVertexWithNumEdges { + public static class TestVertexCheckEdgesType extends VertexCountEdges { @Override public void compute(Iterable<NullWritable> messages) throws IOException { assertFalse(getEdges() instanceof TestOutEdgesFilterEven); @@ -227,14 +219,6 @@ public class TestEdgeInput extends BspCase { } } - public static class TestVertexDoNothing extends Vertex<IntWritable, - IntWritable, NullWritable, NullWritable> { - @Override - public void compute(Iterable<NullWritable> messages) throws IOException { - voteToHalt(); - } - } - public static class TestVertexValueFactory implements VertexValueFactory<IntWritable> { @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java new file mode 100644 index 0000000..83a366d --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestFilters.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io; + +import org.apache.giraph.BspCase; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.filters.EdgeInputFilter; +import org.apache.giraph.io.filters.VertexInputFilter; +import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; +import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat; +import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.giraph.vertices.IntIntNullVertexDoNothing; +import org.apache.giraph.vertices.VertexCountEdges; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +import com.google.common.collect.Maps; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestFilters extends BspCase { + public TestFilters() { + super(TestFilters.class.getName()); + } + + public static class EdgeFilter implements EdgeInputFilter<IntWritable, NullWritable> { + @Override public boolean dropEdge(IntWritable sourceId, Edge<IntWritable, NullWritable> edge) { + return sourceId.get() == 2; + } + } + + @Test + public void testEdgeFilter() throws Exception { + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setVertexClass(VertexCountEdges.class); + conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + conf.setEdgeInputFilterClass(EdgeFilter.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + Iterable<String> results = InternalVertexRunner.run(conf, null, edges); + + Map<Integer, Integer> values = parseResults(results); + + assertEquals(2, values.size()); + assertEquals(1, (int) values.get(1)); + assertEquals(1, (int) values.get(4)); + } + + public static class VertexFilter implements VertexInputFilter<IntWritable, + NullWritable, NullWritable, NullWritable> { + @Override + public boolean dropVertex(Vertex<IntWritable, NullWritable, NullWritable, + NullWritable> vertex) { + int id = vertex.getId().get(); + return id == 2 || id == 3; + } + } + + @Test + public void testVertexFilter() throws Exception { + String[] vertices = new String[] { + "1 1", + "2 2", + "3 3", + "4 4" + }; + + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setVertexClass(IntIntNullVertexDoNothing.class); + conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class); + conf.setVertexInputFilterClass(VertexFilter.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + Iterable<String> results = InternalVertexRunner.run(conf, vertices); + + Map<Integer, Integer> values = parseResults(results); + + assertEquals(2, values.size()); + assertEquals(1, (int) values.get(1)); + assertEquals(4, (int) values.get(4)); + } + + private static Map<Integer, Integer> parseResults(Iterable<String> results) { + Map<Integer, Integer> values = Maps.newHashMap(); + for (String line : results) { + String[] tokens = line.split("\\s+"); + int id = Integer.valueOf(tokens[0]); + int value = Integer.valueOf(tokens[1]); + values.put(id, value); + } + return values; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java new file mode 100644 index 0000000..c98d580 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.vertices; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; + +public class IntIntNullVertexDoNothing extends VertexDoNothing<IntWritable, + IntWritable, NullWritable, NullWritable> { +} http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java new file mode 100644 index 0000000..9060bc7 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.vertices; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; + +import java.io.IOException; + +public class VertexCountEdges extends Vertex<IntWritable, IntWritable, + NullWritable, NullWritable> { + @Override + public void compute(Iterable<NullWritable> messages) throws IOException { + setValue(new IntWritable(getNumEdges())); + voteToHalt(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/afb3ecce/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java new file mode 100644 index 0000000..fac3fce --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.vertices; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import java.io.IOException; + +public class VertexDoNothing<I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> extends Vertex<I, V, E, M> { + @Override + public void compute(Iterable<M> messages) throws IOException { + voteToHalt(); + } +} +
