Repository: giraph Updated Branches: refs/heads/trunk 94033dbbd -> 9a232d185
GIRAPH-848: Allowing plain computation with types being configurable (ikabiljo via majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9a232d18 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9a232d18 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9a232d18 Branch: refs/heads/trunk Commit: 9a232d185eb97a523359e2e2e73a869a4af15a85 Parents: 94033db Author: Maja Kabiljo <[email protected]> Authored: Fri Feb 14 14:15:53 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Fri Feb 14 14:15:53 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../ImmutableClassesGiraphConfiguration.java | 20 ++-- .../org/apache/giraph/master/MasterCompute.java | 20 +++- .../apache/giraph/master/SuperstepClasses.java | 101 +++++++++++++++---- .../apache/giraph/master/TestSwitchClasses.java | 31 +++--- 5 files changed, 131 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 32928f9..5ec1cc6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-848: Allowing plain computation with types being configurable (ikabiljo via majakabiljo) + GIRAPH-854: fix for test fail due to GIRAPH-840 (pavanka via majakabiljo) GIRAPH-853: Fix concurrency issue in GiraphMetrics (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index b33938a..2e8c935 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -34,8 +34,8 @@ import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.DefaultVertex; import org.apache.giraph.graph.Language; import org.apache.giraph.graph.Vertex; -import org.apache.giraph.graph.VertexValueCombiner; import org.apache.giraph.graph.VertexResolver; +import org.apache.giraph.graph.VertexValueCombiner; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeOutputFormat; import org.apache.giraph.io.VertexInputFormat; @@ -76,8 +76,6 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.Progressable; -import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments; - /** * The classes set here are immutable, the remaining configuration is mutable. * Classes are immutable and final to provide the best performance for @@ -1021,16 +1019,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, Class<? extends Computation> computationClass = superstepClasses.getComputationClass(); classes.setComputationClass(computationClass); - if (computationClass != null) { - Class<?>[] classList = - getTypeArguments(TypesHolder.class, computationClass); - - Class<? extends Writable> incomingMsgValueClass = - (Class<? extends Writable>) classList[3]; + Class<? extends Writable> incomingMsgValueClass = + superstepClasses.getIncomingMessageClass(); + if (incomingMsgValueClass != null) { classes.setIncomingMessageValueClass(incomingMsgValueClass); - - Class<? extends Writable> outgoingMsgValueClass = - (Class<? extends Writable>) classList[4]; + } + Class<? extends Writable> outgoingMsgValueClass = + superstepClasses.getOutgoingMessageClass(); + if (outgoingMsgValueClass != null) { classes.setOutgoingMessageValueClass(outgoingMsgValueClass); } classes.setMessageCombiner(superstepClasses.getMessageCombinerClass()); http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index 287fdb9..d77a9b5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -18,9 +18,9 @@ package org.apache.giraph.master; +import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; -import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.GraphState; import org.apache.hadoop.io.Writable; @@ -169,6 +169,24 @@ public abstract class MasterCompute return superstepClasses.getMessageCombinerClass(); } + /** + * Set incoming message class to be used + * @param incomingMessageClass incoming message class + */ + public final void setIncomingMessage( + Class<? extends Writable> incomingMessageClass) { + superstepClasses.setIncomingMessageClass(incomingMessageClass); + } + + /** + * Set outgoing message class to be used + * @param outgoingMessageClass outgoing message class + */ + public final void setOutgoingMessage( + Class<? extends Writable> outgoingMessageClass) { + superstepClasses.setOutgoingMessageClass(outgoingMessageClass); + } + @Override public final <A extends Writable> boolean registerAggregator( String name, Class<? extends Aggregator<A>> aggregatorClass) http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java index 8344910..8145109 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java @@ -18,6 +18,13 @@ package org.apache.giraph.master; +import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Modifier; + import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.TypesHolder; @@ -26,22 +33,23 @@ import org.apache.giraph.graph.Language; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Modifier; - -import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE; +import org.apache.log4j.Logger; /** * Holds Computation and MessageCombiner class. */ public class SuperstepClasses implements Writable { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(SuperstepClasses.class); + /** Computation class to be used in the following superstep */ private Class<? extends Computation> computationClass; /** MessageCombiner class to be used in the following superstep */ private Class<? extends MessageCombiner> messageCombinerClass; + /** Incoming message class to be used in the following superstep */ + private Class<? extends Writable> incomingMessageClass; + /** Outgoing message class to be used in the following superstep */ + private Class<? extends Writable> outgoingMessageClass; /** * Default constructor @@ -80,6 +88,38 @@ public class SuperstepClasses implements Writable { return messageCombinerClass; } + /** + * Get incoming message class, either set directly, or through Computation + * @return incoming message class + */ + public Class<? extends Writable> getIncomingMessageClass() { + if (incomingMessageClass != null) { + return incomingMessageClass; + } + if (computationClass == null) { + return null; + } + Class[] computationTypes = ReflectionUtils.getTypeArguments( + TypesHolder.class, computationClass); + return computationTypes[3]; + } + + /** + * Get outgoing message class, either set directly, or through Computation + * @return outgoing message class + */ + public Class<? extends Writable> getOutgoingMessageClass() { + if (outgoingMessageClass != null) { + return outgoingMessageClass; + } + if (computationClass == null) { + return null; + } + Class[] computationTypes = ReflectionUtils.getTypeArguments( + TypesHolder.class, computationClass); + return computationTypes[4]; + } + public void setComputationClass( Class<? extends Computation> computationClass) { this.computationClass = computationClass; @@ -87,8 +127,17 @@ public class SuperstepClasses implements Writable { public void setMessageCombinerClass( Class<? extends MessageCombiner> messageCombinerClass) { - this.messageCombinerClass = - messageCombinerClass; + this.messageCombinerClass = messageCombinerClass; + } + + public void setIncomingMessageClass( + Class<? extends Writable> incomingMessageClass) { + this.incomingMessageClass = incomingMessageClass; + } + + public void setOutgoingMessageClass( + Class<? extends Writable> outgoingMessageClass) { + this.outgoingMessageClass = outgoingMessageClass; } /** @@ -118,11 +167,13 @@ public class SuperstepClasses implements Writable { verifyTypes(conf.getEdgeValueClass(), computationTypes[2], "Edge value", computationClass); + Class<?> incomingMessageType = getIncomingMessageClass(); + Class<?> outgoingMessageType = getOutgoingMessageClass(); + if (checkMatchingMesssageTypes) { - verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3], - "Previous outgoing and new incoming message", computationClass); + verifyTypes(incomingMessageType, conf.getOutgoingMessageValueClass(), + "New incoming and previous outgoing message", computationClass); } - Class<?> outgoingMessageType = computationTypes[4]; if (outgoingMessageType.isInterface()) { throw new IllegalStateException("verifyTypesMatch: " + "Message type must be concrete class " + outgoingMessageType); @@ -154,9 +205,15 @@ public class SuperstepClasses implements Writable { private void verifyTypes(Class<?> expected, Class<?> actual, String typeDesc, Class<?> mainClass) { if (!expected.equals(actual)) { - throw new IllegalStateException("verifyTypes: " + typeDesc + " types " + - "don't match, in " + mainClass.getName() + " " + expected + + if (actual.isAssignableFrom(expected)) { + LOG.warn("verifyTypes: proceeding with assignable types : " + + typeDesc + " types, in " + mainClass.getName() + " " + expected + " expected, but " + actual + " found"); + } else { + throw new IllegalStateException("verifyTypes: " + typeDesc + + " types " + "don't match, in " + mainClass.getName() + " " + + expected + " expected, but " + actual + " found"); + } } } @@ -164,20 +221,30 @@ public class SuperstepClasses implements Writable { public void write(DataOutput output) throws IOException { WritableUtils.writeClass(computationClass, output); WritableUtils.writeClass(messageCombinerClass, output); + WritableUtils.writeClass(incomingMessageClass, output); + WritableUtils.writeClass(outgoingMessageClass, output); } @Override public void readFields(DataInput input) throws IOException { computationClass = WritableUtils.readClass(input); messageCombinerClass = WritableUtils.readClass(input); + incomingMessageClass = WritableUtils.readClass(input); + outgoingMessageClass = WritableUtils.readClass(input); } @Override public String toString() { String computationName = computationClass == null ? "_not_set_" : computationClass.getName(); - return "(computation=" + computationName + ",combiner=" + - ((messageCombinerClass == null) ? "null" : - messageCombinerClass.getName()) + ")"; + String combinerName = (messageCombinerClass == null) ? "null" : + messageCombinerClass.getName(); + String incomingName = (incomingMessageClass == null) ? "null" : + incomingMessageClass.getName(); + String outgoingName = (outgoingMessageClass == null) ? "null" : + outgoingMessageClass.getName(); + + return "(computation=" + computationName + ",combiner=" + combinerName + + ",incoming=" + incomingName + ",outgoing=" + outgoingName + ")"; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/9a232d18/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java index 29335af..833061e 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java @@ -18,6 +18,14 @@ package org.apache.giraph.master; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; + +import junit.framework.Assert; + import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.graph.AbstractComputation; @@ -32,14 +40,6 @@ import org.junit.Test; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import junit.framework.Assert; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; - /** Test switching Computation and MessageCombiner class during application */ public class TestSwitchClasses { @Test @@ -135,9 +135,14 @@ public class TestSwitchClasses { case 3: setComputation(Computation3.class); setMessageCombiner(SumMessageCombiner.class); + setIncomingMessage(DoubleWritable.class); + setOutgoingMessage(IntWritable.class); break; case 4: setComputation(Computation1.class); + // message types removed + setIncomingMessage(null); + setOutgoingMessage(null); break; default: haltComputation(); @@ -178,11 +183,11 @@ public class TestSwitchClasses { } public static class Computation3 extends AbstractComputation<IntWritable, - StatusValue, IntWritable, DoubleWritable, IntWritable> { + StatusValue, IntWritable, Writable, Writable> { @Override public void compute( Vertex<IntWritable, StatusValue, IntWritable> vertex, - Iterable<DoubleWritable> messages) throws IOException { + Iterable<Writable> messages) throws IOException { vertex.getValue().computations.add(3); vertex.getValue().addDoubleMessages(messages); @@ -238,10 +243,10 @@ public class TestSwitchClasses { messagesReceived.add(messagesList); } - public void addDoubleMessages(Iterable<DoubleWritable> messages) { + public void addDoubleMessages(Iterable<Writable> messages) { HashSet<Double> messagesList = new HashSet<Double>(); - for (DoubleWritable message : messages) { - messagesList.add(message.get()); + for (Writable message : messages) { + messagesList.add(((DoubleWritable)message).get()); } messagesReceived.add(messagesList); }
