http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java new file mode 100644 index 0000000..27819a9 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/mock/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes for generating test case code. + */ +package org.apache.giraph.debugger.mock;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java new file mode 100644 index 0000000..d864986 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Giraph Debugger, named Graft. + * @see {@link https://github.com/semihsalihoglu/graft} + */ +package org.apache.giraph.debugger; http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java new file mode 100644 index 0000000..1391f44 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatedValueWrapper.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.giraph.debugger.GiraphAggregator.AggregatedValue; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; + +import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessage; + +/** + * Wrapper class around + * {@link org.apache.giraph.debugger.GiraphAggregator.AggregatedValue} protocol + * buffer. + * + * author: semihsalihoglu + */ +public class AggregatedValueWrapper extends BaseWrapper { + /** + * Key of the aggregator. + */ + private String key; + /** + * Value of the aggregator. + */ + private Writable value; + + /** + * Public constructor, initializing an empty aggregator. Intended to be used + * when reading an aggregator from a protobuf. + */ + public AggregatedValueWrapper() { } + + /** + * Constructor. Intended to be used by Graft when it's intercepting + * computations during debugging. + * @param key key of the aggregator. + * @param value value of the aggregator. + */ + public AggregatedValueWrapper(String key, Writable value) { + this.key = key; + this.value = value; + } + + @Override + public GeneratedMessage buildProtoObject() { + AggregatedValue.Builder aggregatedValueBuilder = AggregatedValue + .newBuilder(); + aggregatedValueBuilder.setWritableClass(value.getClass().getName()); + aggregatedValueBuilder.setKey(key); + aggregatedValueBuilder.setValue(ByteString.copyFrom(WritableUtils + .writeToByteArray(value))); + return aggregatedValueBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return AggregatedValue.parseFrom(inputStream); + } + + @Override + public void loadFromProto(GeneratedMessage protoObject) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + AggregatedValue aggregatedValueProto = (AggregatedValue) protoObject; + this.value = (Writable) Class.forName( + aggregatedValueProto.getWritableClass()).newInstance(); + WritableUtils.readFieldsFromByteArray(aggregatedValueProto.getValue() + .toByteArray(), this.value); + this.key = aggregatedValueProto.getKey(); + } + + public String getKey() { + return key; + } + + public Writable getValue() { + return value; + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("\nkey: " + key); + stringBuilder + .append(" aggregatedValueClass: " + value.getClass().getName()); + stringBuilder.append(" value: " + value); + return stringBuilder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java new file mode 100644 index 0000000..c7a5cd2 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AggregatorWrapper.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.debugger.GiraphAggregator.AggregatedValue; +import org.apache.giraph.debugger.GiraphAggregator.Aggregator.Builder; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; + +import com.google.protobuf.GeneratedMessage; + +/** + * Wrapper class around + * {@link org.apache.giraph.debugger.GiraphAggregator.Aggregator} protocol + * buffer. + * + * author: semihsalihoglu + */ +@SuppressWarnings("rawtypes") +public class AggregatorWrapper extends BaseWrapper { + + /** + * Key of the aggregator. + */ + private String key; + /** + * The aggregator object. + */ + private final Aggregator<Writable> aggregator; + + /** + * Constructor. + * @param key key of the aggregator. + * @param aggregator the aggregator object. + */ + @SuppressWarnings("unchecked") + public AggregatorWrapper(String key, Aggregator aggregator) { + this.key = key; + this.aggregator = aggregator; + } + + @Override + public GeneratedMessage buildProtoObject() { + Builder aggregatorProtoBuilder = + org.apache.giraph.debugger.GiraphAggregator.Aggregator.newBuilder(); + aggregatorProtoBuilder.setAggregatorClass(aggregator.getClass().getName()); + aggregatorProtoBuilder + .setAggregatedValue((AggregatedValue) new AggregatedValueWrapper(key, + aggregator.getAggregatedValue()).buildProtoObject()); + return aggregatorProtoBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return org.apache.giraph.debugger.GiraphAggregator.Aggregator + .parseFrom(inputStream); + } + + @SuppressWarnings("unchecked") + @Override + public void loadFromProto(GeneratedMessage protoObject) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + org.apache.giraph.debugger.GiraphAggregator.Aggregator aggregatorProto = + (org.apache.giraph.debugger.GiraphAggregator.Aggregator) protoObject; + Aggregator<Writable> giraphAggregator = + (org.apache.giraph.aggregators.Aggregator<Writable>) Class + .forName(aggregatorProto.getAggregatorClass()).newInstance(); + AggregatedValue aggregatedValueProto = aggregatorProto.getAggregatedValue(); + this.key = aggregatedValueProto.getKey(); + Writable giraphAggregatedValue = (Writable) Class.forName( + aggregatedValueProto.getWritableClass()).newInstance(); + WritableUtils.readFieldsFromByteArray(aggregatedValueProto.getValue() + .toByteArray(), giraphAggregatedValue); + giraphAggregator.setAggregatedValue(giraphAggregatedValue); + } + + public String getKey() { + return key; + } + + public Aggregator<Writable> getAggregator() { + return aggregator; + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("\nkey: " + key); + stringBuilder + .append(" aggregatorClass: " + aggregator.getClass().getName()); + stringBuilder.append(" aggregatedValueClass: " + + aggregator.getAggregatedValue().getClass().getName()); + stringBuilder.append(" value: " + aggregator.getAggregatedValue()); + return stringBuilder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java new file mode 100644 index 0000000..5a932b1 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/AsyncHDFSWriteService.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +import com.google.protobuf.GeneratedMessage; + +/** + * A utility class for writing to HDFS asynchronously. + */ +public class AsyncHDFSWriteService { + + /** + * Logger for this class. + */ + protected static final Logger LOG = Logger + .getLogger(AsyncHDFSWriteService.class); + + /** + * The thread pool that will handle the synchronous writing, and hide the + * latency from the callers. + */ + private static ExecutorService HDFS_ASYNC_WRITE_SERVICE = Executors + .newFixedThreadPool(2); + static { + // Make sure we finish writing everything before shuting down the VM. + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + LOG.info("Shutting down writer"); + HDFS_ASYNC_WRITE_SERVICE.shutdown(); + LOG.info("Waiting until finishes all writes"); + try { + HDFS_ASYNC_WRITE_SERVICE.awaitTermination(Long.MAX_VALUE, + TimeUnit.NANOSECONDS); + LOG.info("Finished all writes"); + } catch (InterruptedException e) { + LOG.error("Could not finish all writes"); + e.printStackTrace(); + } + } + })); + } + + /** + * Not for instantiation. + */ + private AsyncHDFSWriteService() { + } + + /** + * Writes given protobuf message to the given filesystem path in the + * background. + * + * @param message + * The proto message to write. + * @param fs + * The HDFS filesystem to write to. + * @param fileName + * The HDFS path to write the message to. + */ + public static void writeToHDFS(final GeneratedMessage message, + final FileSystem fs, final String fileName) { + HDFS_ASYNC_WRITE_SERVICE.submit(new Runnable() { + @Override + public void run() { + Path pt = new Path(fileName); + try { + LOG.info("Writing " + fileName + " at " + fs.getUri()); + OutputStream wrappedStream = fs.create(pt, true).getWrappedStream(); + message.writeTo(wrappedStream); + wrappedStream.close(); + LOG.info("Done writing " + fileName); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java new file mode 100644 index 0000000..7a736f5 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseScenarioAndIntegrityWrapper.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import org.apache.hadoop.io.WritableComparable; + +/** + * Base wrapper class for {@link GiraphVertexScenarioWrapper}, + * {@link MsgIntegrityViolationWrapper}, + * {@link VertexValueIntegrityViolationWrapper}. + * + * author: semihsalihoglu + * + * @param <I> + * Vertex id + */ +@SuppressWarnings("rawtypes") +public abstract class BaseScenarioAndIntegrityWrapper< + I extends WritableComparable> extends BaseWrapper { + /** + * Class of the type of the vertex IDs. + */ + protected Class<I> vertexIdClass; + + /** + * Default empty constructor. + */ + protected BaseScenarioAndIntegrityWrapper() { }; + + /** + * Default constructor initializing the vertexIdClass. + * @param vertexIdClass vertex id class. + */ + public BaseScenarioAndIntegrityWrapper(Class<I> vertexIdClass) { + initialize(vertexIdClass); + } + + public Class<I> getVertexIdClass() { + return vertexIdClass; + } + + /** + * Initializes vertex id class. + * @param vertexIdClass vertex id class. + */ + public void initialize(Class<I> vertexIdClass) { + this.vertexIdClass = vertexIdClass; + } + + @Override + public String toString() { + return "\nvertexIdClass: " + getVertexIdClass().getCanonicalName(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java new file mode 100644 index 0000000..e0a6d51 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/BaseWrapper.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; + +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; + +import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessage; + +/** + * Base class for all wrapper classes that wrap a protobuf. + * + * author: semihsalihoglu + */ +public abstract class BaseWrapper { + + /** + * @param <U> type of the upperBound class. + * @param clazz a {@link Class} object that will be cast. + * @param upperBound another {@link Class} object that clazz will be cast + * into. + * @return clazz cast to upperBound. + */ + @SuppressWarnings("unchecked") + protected <U> Class<U> castClassToUpperBound(Class<?> clazz, + Class<U> upperBound) { + if (!upperBound.isAssignableFrom(clazz)) { + throw new IllegalArgumentException("The class " + clazz.getName() + + " is not a subclass of " + upperBound.getName()); + } + return (Class<U>) clazz; + } + + /** + * Utility method to read the contents of a {@link ByteString} to the given + * {@link Writable}. + * @param byteString a {@link ByteString} object. + * @param writable a {@link Writable} object. + */ + void fromByteString(ByteString byteString, Writable writable) { + if (writable != null) { + WritableUtils.readFieldsFromByteArray(byteString.toByteArray(), writable); + } + } + + /** + * @param writable a {@link Writable} object. + * @return the contents of writable as {@link ByteString}. + */ + ByteString toByteString(Writable writable) { + return ByteString.copyFrom(WritableUtils.writeToByteArray(writable)); + } + + /** + * Saves this wrapper object to a file. + * @param fileName the full path of the file to save this wrapper object. + * @throws IOException thrown when there is an exception during the writing. + */ + public void save(String fileName) throws IOException { + try (FileOutputStream output = new FileOutputStream(fileName)) { + buildProtoObject().writeTo(output); + output.close(); + } + } + + + /** + * Saves this wrapper object to a file in HDFS. + * @param fs {@link FileSystem} to use for saving to HDFS. + * @param fileName the full path of the file to save this wrapper object. + * @throws IOException thrown when there is an exception during the writing. + */ + public void saveToHDFS(FileSystem fs, String fileName) throws IOException { + AsyncHDFSWriteService.writeToHDFS(buildProtoObject(), fs, fileName); + } + + /** + * @return the protobuf representing this wrapper object. + */ + public abstract GeneratedMessage buildProtoObject(); + + /** + * Loads a protocol buffer stored in a file into this wrapper object. + * @param fileName the full path of the file where the protocol buffer is + * stored. + */ + public void load(String fileName) throws ClassNotFoundException, IOException, + InstantiationException, IllegalAccessException { + try (FileInputStream inputStream = new FileInputStream(fileName)) { + loadFromProto(parseProtoFromInputStream(inputStream)); + } + } + + /** + * Loads a protocol buffer stored in a file in HDFS into this wrapper object. + * @param fs {@link FileSystem} to use for reading from HDFS. + * @param fileName the full path of the file where the protocol buffer is + * stored. + */ + public void loadFromHDFS(FileSystem fs, String fileName) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + try (FSDataInputStream inputStream = fs.open(new Path(fileName))) { + loadFromProto(parseProtoFromInputStream(inputStream)); + } + } + + /** + * Constructs a protobuf representing this wrapper object from an + * {@link InputStream}. + * @param inputStream {@link InputStream} containing the contents of this + * wrapper object. + * @return the protobuf version of this wrapper object. + */ + public abstract GeneratedMessage parseProtoFromInputStream( + InputStream inputStream) throws IOException; + + /** + * Constructs this wrapper object from a protobuf. + * @param protoObject protobuf to read when constructing this wrapper object. + */ + public abstract void loadFromProto(GeneratedMessage protoObject) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException; + + /** + * Add given URLs to the CLASSPATH before loading from HDFS. To do so, we hack + * the system class loader, assuming it is an URLClassLoader. + * + * XXX Setting the currentThread's context class loader has no effect on + * Class#forName(). + * + * @see http://stackoverflow.com/a/12963811/390044 + * @param fs {@link FileSystem} to use for reading from HDFS. + * @param fileName the name of the file in HDFS. + * @param classPaths a possible list of class paths that may contain the + * directories containing the file. + */ + public void loadFromHDFS(FileSystem fs, String fileName, URL... classPaths) + throws ClassNotFoundException, InstantiationException, + IllegalAccessException, IOException { + for (URL url : classPaths) { + addPath(url); + } + loadFromHDFS(fs, fileName); + } + + /** + * @param u + * the URL to add to the CLASSPATH + * @see http://stackoverflow.com/a/252967/390044 + */ + private static void addPath(URL u) { + // need to do add path to Classpath with reflection since the + // URLClassLoader.addURL(URL url) method is protected: + ClassLoader cl = ClassLoader.getSystemClassLoader(); + if (cl instanceof URLClassLoader) { + URLClassLoader urlClassLoader = (URLClassLoader) cl; + Class<URLClassLoader> urlClass = URLClassLoader.class; + try { + Method method = urlClass.getDeclaredMethod("addURL", + new Class[] { URL.class }); + method.setAccessible(true); + method.invoke(urlClassLoader, u); + } catch (NoSuchMethodException | SecurityException | + IllegalAccessException | IllegalArgumentException | + InvocationTargetException e) { + throw new IllegalStateException("Cannot add URL to system ClassLoader", + e); + } + } else { + throw new IllegalStateException( + "Cannot add URL to system ClassLoader of type " + + cl.getClass().getSimpleName()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java new file mode 100644 index 0000000..e6a3858 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/CommonVertexMasterContextWrapper.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.debugger.GiraphAggregator.AggregatedValue; +import org.apache.giraph.debugger.Scenario.CommonVertexMasterContext; + +import com.google.protobuf.GeneratedMessage; + +/** + * Wrapper class around + * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext} + * protocol buffer. + */ +@SuppressWarnings("rawtypes") +public class CommonVertexMasterContextWrapper extends BaseWrapper { + /** + * Wraps the {@link ImmutableClassesGiraphConfiguration} which + * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext} + * exposes. + */ + private ImmutableClassesGiraphConfiguration immutableClassesConfig = null; + /** + * Wraps the superstep number which + * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext} + * exposes. + */ + private long superstepNo; + /** + * Wraps the totalNumVertices which + * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext} + * exposes. + */ + private long totalNumVertices; + /** + * Wraps the totalNumEdges which + * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext} + * exposes. + */ + private long totalNumEdges; + /** + * Wraps the aggregated values from the previous superstep which + * {@link org.apache.giraph.debugger.Scenario.CommonVertexMasterContext} + * exposes. + */ + private List<AggregatedValueWrapper> previousAggregatedValueWrappers; + + /** + * Default constructor. Initializes superstepNo, totalNumVertices, and + * totalNumEdges to -1. Initializes an empty aggregated values. + */ + public CommonVertexMasterContextWrapper() { + this.superstepNo = -1; + this.totalNumVertices = -1; + this.totalNumEdges = -1; + this.previousAggregatedValueWrappers = new ArrayList<>(); + } + + /** + * Constructor with immutableClassesConfig, superstepNo, totalNumVertices, + * and totalNumEdges. Does not initialize previousAggregatedValueWrappers. + * @param immutableClassesConfig the + * {@link ImmutableClassesGiraphConfiguration} to initialize. + * @param superstepNo superstep number to initialize. + * @param totalNumVertices total number of vertices number to initialize. + * @param totalNumEdges total number of edges to initialize. + */ + public CommonVertexMasterContextWrapper( + ImmutableClassesGiraphConfiguration immutableClassesConfig, + long superstepNo, long totalNumVertices, long totalNumEdges) { + this.immutableClassesConfig = immutableClassesConfig; + this.superstepNo = superstepNo; + this.totalNumVertices = totalNumVertices; + this.totalNumEdges = totalNumEdges; + } + + public long getSuperstepNoWrapper() { + return superstepNo; + } + + public long getTotalNumVerticesWrapper() { + return totalNumVertices; + } + + public long getTotalNumEdgesWrapper() { + return totalNumEdges; + } + + public void setSuperstepNoWrapper(long superstepNo) { + this.superstepNo = superstepNo; + } + + public void setTotalNumVerticesWrapper(long totalNumVertices) { + this.totalNumVertices = totalNumVertices; + } + + public void setTotalNumEdgesWrapper(long totalNumEdges) { + this.totalNumEdges = totalNumEdges; + } + + /** + * Adds an aggregated value from the previous superstep. + * @param previousAggregatedValueWrapper an {@link AggregatedValueWrapper} + * object wrapping the aggregated value. + */ + public void addPreviousAggregatedValue( + AggregatedValueWrapper previousAggregatedValueWrapper) { + this.previousAggregatedValueWrappers.add(previousAggregatedValueWrapper); + } + + public void setPreviousAggregatedValues( + List<AggregatedValueWrapper> previousAggregatedValueWrappers) { + this.previousAggregatedValueWrappers = previousAggregatedValueWrappers; + } + + public Collection<AggregatedValueWrapper> getPreviousAggregatedValues() { + return previousAggregatedValueWrappers; + } + + public ImmutableClassesGiraphConfiguration getConfig() { + return immutableClassesConfig; + } + + public void setConfig( + ImmutableClassesGiraphConfiguration immutableClassesConfig) { + this.immutableClassesConfig = immutableClassesConfig; + } + + @Override + public GeneratedMessage buildProtoObject() { + CommonVertexMasterContext.Builder commonContextBuilder = + CommonVertexMasterContext.newBuilder(); + commonContextBuilder.setConf(toByteString(immutableClassesConfig)) + .setSuperstepNo(getSuperstepNoWrapper()) + .setTotalNumVertices(getTotalNumVerticesWrapper()) + .setTotalNumEdges(getTotalNumEdgesWrapper()); + + for (AggregatedValueWrapper aggregatedValueWrapper : + getPreviousAggregatedValues()) { + commonContextBuilder + .addPreviousAggregatedValue((AggregatedValue) aggregatedValueWrapper + .buildProtoObject()); + } + return commonContextBuilder.build(); + } + + @Override + public void loadFromProto(GeneratedMessage generatedMessage) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + CommonVertexMasterContext commonContext = (CommonVertexMasterContext) + generatedMessage; + GiraphConfiguration config = new GiraphConfiguration(); + fromByteString(commonContext.getConf(), config); + ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration = + new ImmutableClassesGiraphConfiguration(config); + setConfig(immutableClassesGiraphConfiguration); + + setSuperstepNoWrapper(commonContext.getSuperstepNo()); + setTotalNumVerticesWrapper(commonContext.getTotalNumVertices()); + setTotalNumEdgesWrapper(commonContext.getTotalNumEdges()); + + for (AggregatedValue previousAggregatedValueProto : commonContext + .getPreviousAggregatedValueList()) { + AggregatedValueWrapper aggregatedValueWrapper = + new AggregatedValueWrapper(); + aggregatedValueWrapper.loadFromProto(previousAggregatedValueProto); + addPreviousAggregatedValue(aggregatedValueWrapper); + } + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return CommonVertexMasterContext.parseFrom(inputStream); + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("\nconfig: " + immutableClassesConfig.toString()); + stringBuilder.append("superstepNo: " + getSuperstepNoWrapper()); + stringBuilder.append("\ntotalNumVertices: " + totalNumVertices); + stringBuilder.append("\ntotalNumEdges: " + totalNumEdges); + stringBuilder.append("\nnumAggregators: " + + getPreviousAggregatedValues().size()); + for (AggregatedValueWrapper aggregatedValueWrapper : + getPreviousAggregatedValues()) { + stringBuilder.append("\n" + aggregatedValueWrapper); + } + return stringBuilder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java new file mode 100644 index 0000000..03fdf68 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/DebuggerUtils.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +/** + * Contains common utility classes shared one or more of: + * <ul> + * <li>Graft instrumenter and the + * <li>server that serves data to Graft GUI by talking to HDFS + * <li>Wrapper classes around the scenario protocol buffers that are stored + * under {@link org.apache.giraph.debugger.utils}. + * </ul> + * + * author semihsalihoglu + */ +public class DebuggerUtils { + + /** + * The path to the HDFS root for storing Graft traces. + */ + public static final String TRACE_ROOT = System.getProperty( + "giraph.debugger.traceRootAtHDFS", + "/user/" + System.getProperty("user.name") + "/giraph-debug-traces"); + /** + * The path to the HDFS root for storing cached Giraph job jars. + */ + public static final String JARCACHE_HDFS = System.getProperty( + "giraph.debugger.jobCacheAtHDFS", TRACE_ROOT + "/jars"); + /** + * The path to the local root directory for storing cached Giraph job jars. + */ + public static final String JARCACHE_LOCAL = System.getProperty( + "giraph.debugger.jobCacheLocal", System.getenv("HOME") + + "/.giraph-debug/jars"); + + /** + * Enumeration of different trace files Graft saves in HDFS. + */ + public enum DebugTrace { + /** + * Regular trace capturing a vertex computation. + */ + VERTEX_REGULAR("regular vertex"), + /** + * Captured exception from a vertex. + */ + VERTEX_EXCEPTION("exception from a vertex"), + /** + * All traces of a particular vertex. + */ + VERTEX_ALL, + /** + * Captured message integrity violations. + */ + INTEGRITY_MESSAGE_ALL("invalid messages"), + /** + * Trace of the single message violating constraints. + */ + INTEGRITY_MESSAGE_SINGLE_VERTEX("vertex sending invalid messages"), + /** + * Trace of the vertex computation that sends an invalid message. + */ + INTEGRITY_VERTEX("vertex having invalid value"), + /** + * Regular trace of a MasterCompute. + */ + MASTER_REGULAR("regular MasterCompute"), + /** + * Trace capturing exception thrown from a MasterCompute. + */ + MASTER_EXCEPTION("exception from MasterCompute"), + /** + * All traces of MasterCompute. + */ + MASTER_ALL, + /** + * The jar signature that links the instrumented jar. + */ + JAR_SIGNATURE; + + /** + * The label of this debug trace. + */ + private final String label; + + /** + * Creates a DebugTrace instance without a label. + */ + private DebugTrace() { + this.label = null; + } + + /** + * Creates a DebugTrace instance with a specific label. + * @param label The label. + */ + private DebugTrace(String label) { + this.label = label; + } + + /** + * Returns the label. + * @return the label + */ + public String getLabel() { + return label; + } + } + + /** + * File name prefix for regular traces. + */ + public static final String PREFIX_TRACE_REGULAR = "reg"; + /** + * File name prefix for exception traces. + */ + public static final String PREFIX_TRACE_EXCEPTION = "err"; + /** + * File name prefix for vertex value integrity traces. + */ + public static final String PREFIX_TRACE_VERTEX = "vv"; + /** + * File name prefix for message integrity traces. + */ + public static final String PREFIX_TRACE_MESSAGE = "msg"; + + /** + * Disallows creating instances of this class. + */ + private DebuggerUtils() { } + + /** + * Makes a clone of a writable object. Giraph sometimes reuses and overwrites + * the bytes inside {@link Writable} objects. For example, when reading the + * incoming messages inside a {@link Computation} class through the iterator + * Giraph supplies, Giraph uses only one object. Therefore in order to keep a + * pointer to particular object, we need to clone it. + * + * @param <T> + * Type of the clazz. + * @param writableToClone + * Writable object to clone. + * @param clazz + * Class of writableToClone. + * @return a clone of writableToClone. + */ + public static <T extends Writable> T makeCloneOf(T writableToClone, + Class<T> clazz) { + T idCopy = newInstance(clazz); + // Return value is null if clazz is assignable to NullWritable. + if (idCopy == null) { + return writableToClone; + } + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream( + byteArrayOutputStream); + try { + writableToClone.write(dataOutputStream); + } catch (IOException e) { + // Throwing a runtime exception because the methods that call other + // methods + // such as addNeighborWrapper or addOutgoingMessageWrapper, implement + // abstract classes + // or interfaces of Giraph that we can't edit to include a throws + // statement. + throw new RuntimeException(e); + } + // + if (byteArrayOutputStream.toByteArray() != null) { + WritableUtils.readFieldsFromByteArray( + byteArrayOutputStream.toByteArray(), idCopy); + byteArrayOutputStream.reset(); + } + return idCopy; + } + + /** + * Instantiates a new object from the given class. + * + * @param <T> The type of the new instance to create. + * @param theClass The class of the new instance to create. + * @return The newly created instance. + */ + public static <T> T newInstance(Class<T> theClass) { + return NullWritable.class.isAssignableFrom(theClass) ? null : + ReflectionUtils.newInstance(theClass); + } + + /** + * Returns the full trace file name for the given type of debug trace. One or + * more of the passed arguments will be used in the file name. + * + * @param debugTrace The debug trace for generating the file name. + * @param jobId The job id of the job the debug trace belongs to. + * @param superstepNo The superstep number of the debug trace. + * @param vertexId The vertex id of the debug trace. + * @param taskId The task id of the debug trace. + * @return The full trace file name. + */ + public static String getFullTraceFileName(DebugTrace debugTrace, + String jobId, Long superstepNo, String vertexId, String taskId) { + return getTraceFileRoot(jobId) + "/" + + getTraceFileName(debugTrace, superstepNo, vertexId, taskId); + } + + /** + * A convenience method around + * {@link #getFullTraceFileName(DebugTrace, String, Long, String, Integer)}. + * + * @param superstepNo The superstep number of the trace. + * @param jobId The job id of the trace. + * @param taskId The task id of the trace. + * @return The full trace file name for debug trace of message integrity. + */ + public static String getMessageIntegrityAllTraceFullFileName( + long superstepNo, String jobId, String taskId) { + return getFullTraceFileName(DebugTrace.INTEGRITY_MESSAGE_ALL, jobId, + superstepNo, null /* no vertex Id */, taskId); + } + + /** + * A convenience method around + * {@link #getFullTraceFileName(DebugTrace, String, Long, String, Integer)}. + * + * @param masterDebugTrace The debug trace for generating the file name. + * @param jobId The job id the debug trace belongs to. + * @param superstepNo The superstep number. + * @return The full trace file name of the master compute trace. + */ + public static String getFullMasterTraceFileName(DebugTrace masterDebugTrace, + String jobId, Long superstepNo) { + return getFullTraceFileName(masterDebugTrace, jobId, superstepNo, + null /* no vertex Id */, null /* no trace Id */); + } + + /** + * A convenience method around + * {@link #getFullTraceFileName(DebugTrace, String, Long, String, Integer)}. + * + * @param debugTrace The debug trace for generating the file name. + * @param jobId The job id the debug trace belongs to. + * @param superstepNo The superstep number. + * @param vertexId The vertex id of the debug trace. + * @return The full trace file name without the trace id. + */ + public static String getFullTraceFileName(DebugTrace debugTrace, + String jobId, Long superstepNo, String vertexId) { + return getFullTraceFileName(debugTrace, jobId, superstepNo, vertexId, + null /* no trace Id */); + } + + /** + * Maps debug trace to file names with additional parameters. + * + * @param debugTrace The debug trace. + * @param superstepNo The superstep number. + * @param vertexId The vertex id. + * @param taskId The task id. + * @return The file name that corresponds to the debug trace. + */ + private static String getTraceFileName(DebugTrace debugTrace, + Long superstepNo, String vertexId, String taskId) { + String format = getTraceFileFormat(debugTrace); + switch (debugTrace) { + case VERTEX_REGULAR: + return String.format(format, superstepNo, vertexId); + case VERTEX_EXCEPTION: + return String.format(format, superstepNo, vertexId); + case INTEGRITY_MESSAGE_ALL: + return String.format(format, taskId, superstepNo); + case INTEGRITY_MESSAGE_SINGLE_VERTEX: + return String.format(format, superstepNo, vertexId); + case INTEGRITY_VERTEX: + return String.format(format, superstepNo, vertexId); + case MASTER_REGULAR: + return String.format(format, superstepNo); + case MASTER_EXCEPTION: + return String.format(format, superstepNo); + default: + return null; + } + } + + /** + * Returns the file name of the trace file given the three parameters. Pass + * arbitrary vertexId for traces which do not require a vertexId. + * + * @param debugTrace + * The debug trace. + * @return The file name format for the debug trace to be used with + * {@link String#format(String, Object...)}. + */ + public static String getTraceFileFormat(DebugTrace debugTrace) { + // XXX is this function giving the String format? or regex? Seems latter. + switch (debugTrace) { + case VERTEX_REGULAR: + return PREFIX_TRACE_REGULAR + "_stp_%s_vid_%s.tr"; + case VERTEX_EXCEPTION: + return PREFIX_TRACE_EXCEPTION + "_stp_%s_vid_%s.tr"; + case VERTEX_ALL: + return String.format("(%s|%s)%s", PREFIX_TRACE_REGULAR, + PREFIX_TRACE_EXCEPTION, "_stp_%s_vid_%s.tr"); + case INTEGRITY_MESSAGE_ALL: + return "task_%s_msg_intgrty_stp_%s.tr"; + case INTEGRITY_MESSAGE_SINGLE_VERTEX: + return PREFIX_TRACE_MESSAGE + "_intgrty_stp_%s_vid_%s.tr"; + case INTEGRITY_VERTEX: + return PREFIX_TRACE_VERTEX + "_intgrty_stp_%s_vid_%s.tr"; + case MASTER_REGULAR: + return "master_" + PREFIX_TRACE_REGULAR + "_stp_%s.tr"; + case MASTER_EXCEPTION: + return "master_" + PREFIX_TRACE_EXCEPTION + "_stp_%s.tr"; + case MASTER_ALL: + return String.format("master_(%s|%s)_%s", PREFIX_TRACE_REGULAR, + PREFIX_TRACE_EXCEPTION, "_stp_%s.tr"); + default: + throw new IllegalArgumentException("DebugTrace not supported."); + } + } + + /** + * Maps prefix back to the corresponding debug trace. + * + * @param prefix The file name prefix. + * @return The debug trace value that corresponds to given prefix. + * @throws IllegalArgumentException Thrown if prefix isn't supported. + */ + public static DebugTrace getVertexDebugTraceForPrefix(String prefix) { + if (prefix.equals(PREFIX_TRACE_REGULAR)) { + return DebugTrace.VERTEX_REGULAR; + } else if (prefix.equals(PREFIX_TRACE_EXCEPTION)) { + return DebugTrace.VERTEX_EXCEPTION; + } else if (prefix.equals(PREFIX_TRACE_VERTEX)) { + return DebugTrace.INTEGRITY_VERTEX; + } else if (prefix.equals(PREFIX_TRACE_MESSAGE)) { + return DebugTrace.INTEGRITY_MESSAGE_SINGLE_VERTEX; + } else { + throw new IllegalArgumentException("Prefix not supported"); + } + } + + /** + * Returns the root directory of the trace files for the given job. + * + * @param jobId The job id of the job. + * @return The root path for storing traces for the job. + */ + public static String getTraceFileRoot(String jobId) { + return String.format("%s/%s", DebuggerUtils.TRACE_ROOT, jobId); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java new file mode 100644 index 0000000..cc0598e --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/ExceptionWrapper.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.giraph.debugger.Scenario.Exception; + +import com.google.protobuf.GeneratedMessage; + +/** + * Wrapper class around {@link org.apache.giraph.debugger.Scenario.Exception} + * protocol buffer. + * + * author semihsalihoglu + */ +public class ExceptionWrapper extends BaseWrapper { + /** + * The error message of the exception. + */ + private String errorMessage = ""; + /** + * The stack trace string of the exception. + */ + private String stackTrace = ""; + + /** + * Default constructor. + */ + public ExceptionWrapper() { + } + + /** + * Constructor with an error message and stack trace. + * + * @param errorMessage + * The error message of the exception. + * @param stackTrace + * The stack trace string obtained from + * {@link java.lang.Exception#getStackTrace()}. + */ + public ExceptionWrapper(String errorMessage, String stackTrace) { + this.errorMessage = errorMessage; + this.stackTrace = stackTrace; + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("errorMessage: " + getErrorMessage()); + stringBuilder.append("\nstackTrace: " + getStackTrace()); + return stringBuilder.toString(); + } + + public String getErrorMessage() { + // We append with "" to guard against null pointer exceptions + return "" + errorMessage; + } + + public String getStackTrace() { + // We append with "" to guard against null pointer exceptions + return "" + stackTrace; + } + + @Override + public GeneratedMessage buildProtoObject() { + Exception.Builder exceptionBuilder = Exception.newBuilder(); + exceptionBuilder.setMessage(getErrorMessage()); + exceptionBuilder.setStackTrace(getStackTrace()); + return exceptionBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return Exception.parseFrom(inputStream); + } + + @Override + public void loadFromProto(GeneratedMessage generatedMessage) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + Exception exceptionProto = (Exception) generatedMessage; + this.errorMessage = exceptionProto.getMessage(); + this.stackTrace = exceptionProto.getStackTrace(); + } + + public void setErrorMessage(String errorMessage) { + // We append "" to guard against null pointer exceptions + this.errorMessage = "" + errorMessage; + } + + public void setStackTrace(String stackTrace) { + // We append "" to guard against null pointer exceptions + this.stackTrace = "" + stackTrace; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java new file mode 100644 index 0000000..0831adc --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphMasterScenarioWrapper.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.giraph.debugger.Scenario.CommonVertexMasterContext; +import org.apache.giraph.debugger.Scenario.Exception; +import org.apache.giraph.debugger.Scenario.GiraphMasterScenario; + +import com.google.protobuf.GeneratedMessage; + +/** + * Wrapper class around + * {@link org.apache.giraph.debugger.Scenario.GiraphMasterScenario} protocol + * buffer. + * + * author semihsalihoglu + */ +public class GiraphMasterScenarioWrapper extends BaseWrapper { + /** + * The MasterCompute class under debugging. + */ + private String masterClassUnderTest; + /** + * The common wrapper instance. + */ + private CommonVertexMasterContextWrapper commonVertexMasterContextWrapper = + null; + /** + * The exception wrapper instance. + */ + private ExceptionWrapper exceptionWrapper = null; + + /** + * Default constructor. + */ + public GiraphMasterScenarioWrapper() { + } + + /** + * Constructor with a MasterCompute class name. + * + * @param masterClassUnderTest The MasterCompute class name. + */ + public GiraphMasterScenarioWrapper(String masterClassUnderTest) { + this.masterClassUnderTest = masterClassUnderTest; + this.commonVertexMasterContextWrapper = new + CommonVertexMasterContextWrapper(); + this.exceptionWrapper = null; + } + + public String getMasterClassUnderTest() { + return masterClassUnderTest; + } + + public CommonVertexMasterContextWrapper getCommonVertexMasterContextWrapper() + { + return commonVertexMasterContextWrapper; + } + + public void setCommonVertexMasterContextWrapper( + CommonVertexMasterContextWrapper commonVertexMasterContextWrapper) { + this.commonVertexMasterContextWrapper = commonVertexMasterContextWrapper; + } + + public ExceptionWrapper getExceptionWrapper() { + return exceptionWrapper; + } + + public void setExceptionWrapper(ExceptionWrapper exceptionWrapper) { + this.exceptionWrapper = exceptionWrapper; + } + + /** + * Checks if this has an exception wrapper. + * @return True if this has an exception wrapper. + */ + public boolean hasExceptionWrapper() { + return exceptionWrapper != null; + } + + @Override + public GeneratedMessage buildProtoObject() { + GiraphMasterScenario.Builder giraphMasterScenarioBuilder = + GiraphMasterScenario.newBuilder(); + giraphMasterScenarioBuilder.setMasterClassUnderTest(masterClassUnderTest); + giraphMasterScenarioBuilder + .setCommonContext((CommonVertexMasterContext) + commonVertexMasterContextWrapper.buildProtoObject()); + if (hasExceptionWrapper()) { + giraphMasterScenarioBuilder.setException((Exception) exceptionWrapper + .buildProtoObject()); + } + return giraphMasterScenarioBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return GiraphMasterScenario.parseFrom(inputStream); + } + + @Override + public void loadFromProto(GeneratedMessage protoObject) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + GiraphMasterScenario giraphMasterScenario = (GiraphMasterScenario) + protoObject; + this.masterClassUnderTest = giraphMasterScenario.getMasterClassUnderTest(); + this.commonVertexMasterContextWrapper = new + CommonVertexMasterContextWrapper(); + this.commonVertexMasterContextWrapper.loadFromProto(giraphMasterScenario + .getCommonContext()); + if (giraphMasterScenario.hasException()) { + this.exceptionWrapper = new ExceptionWrapper(); + this.exceptionWrapper.loadFromProto(giraphMasterScenario.getException()); + } + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("masterClassUnderTest: " + masterClassUnderTest); + stringBuilder.append("\n" + commonVertexMasterContextWrapper.toString()); + stringBuilder.append("\nhasExceptionWrapper: " + hasExceptionWrapper()); + if (hasExceptionWrapper()) { + stringBuilder.append("\n" + exceptionWrapper.toString()); + } + return stringBuilder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8675c84a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java new file mode 100644 index 0000000..0f36605 --- /dev/null +++ b/giraph-debugger/src/main/java/org/apache/giraph/debugger/utils/GiraphVertexScenarioWrapper.java @@ -0,0 +1,819 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.debugger.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.giraph.debugger.Scenario.CommonVertexMasterContext; +import org.apache.giraph.debugger.Scenario.Exception; +import org.apache.giraph.debugger.Scenario.GiraphVertexScenario; +import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext; +import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext.Neighbor; +import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext.OutgoingMessage; +import org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexScenarioClasses; +import org.apache.giraph.graph.Computation; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.protobuf.GeneratedMessage; + +/** + * Wrapper class around + * {@link org.apache.giraph.debugger.Scenario.GiraphVertexScenario} protocol + * buffer. In {@link org.apache.giraph.debugger.Scenario.GiraphVertexScenario} + * most fields are stored as serialized byte arrays and this class gives them + * access through the java classes that those byte arrays serialize. + * + * @param <I> + * vertex ID class. + * @param <V> + * vertex value class. + * @param <E> + * edge value class. + * @param <M1> + * incoming message class. + * @param <M2> + * outgoing message class. + * + * author Brian Truong + */ +@SuppressWarnings("rawtypes") +public class GiraphVertexScenarioWrapper<I extends WritableComparable, V extends + Writable, E extends Writable, M1 extends Writable, M2 extends Writable> + extends BaseWrapper { + + /** + * Vertex scenario classes wrapper instance. + */ + private VertexScenarioClassesWrapper vertexScenarioClassesWrapper = null; + /** + * Vertex context wrapper instance. + */ + private VertexContextWrapper contextWrapper = null; + /** + * Exception wrapper instance. + */ + private ExceptionWrapper exceptionWrapper = null; + + /** + * Empty constructor to be used for loading from HDFS. + */ + public GiraphVertexScenarioWrapper() { + } + + /** + * Constructor with classes. + * + * @param classUnderTest The Computation class under test. + * @param vertexIdClass The vertex id class. + * @param vertexValueClass The vertex value class. + * @param edgeValueClass The edge value class. + * @param incomingMessageClass The incoming message class. + * @param outgoingMessageClass The outgoing message class. + */ + public GiraphVertexScenarioWrapper( + Class<? extends Computation<I, V, E, M1, M2>> classUnderTest, + Class<I> vertexIdClass, Class<V> vertexValueClass, Class<E> edgeValueClass, + Class<M1> incomingMessageClass, Class<M2> outgoingMessageClass) { + this.vertexScenarioClassesWrapper = new VertexScenarioClassesWrapper( + classUnderTest, vertexIdClass, vertexValueClass, edgeValueClass, + incomingMessageClass, outgoingMessageClass); + this.contextWrapper = new VertexContextWrapper(); + } + + public VertexContextWrapper getContextWrapper() { + return contextWrapper; + } + + public void setContextWrapper(VertexContextWrapper contextWrapper) { + this.contextWrapper = contextWrapper; + } + + /** + * Checks if this has an exception wrapper. + * @return True if this has an exception wrapper. + */ + public boolean hasExceptionWrapper() { + return exceptionWrapper != null; + } + + public ExceptionWrapper getExceptionWrapper() { + return exceptionWrapper; + } + + public void setExceptionWrapper(ExceptionWrapper exceptionWrapper) { + this.exceptionWrapper = exceptionWrapper; + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(super.toString()); + stringBuilder.append("\n" + vertexScenarioClassesWrapper.toString()); + stringBuilder.append("\n" + contextWrapper.toString()); + stringBuilder.append("\nhasExceptionWrapper: " + hasExceptionWrapper()); + if (hasExceptionWrapper()) { + stringBuilder.append("\n" + exceptionWrapper.toString()); + } + return stringBuilder.toString(); + } + + /** + * Wrapper class around + * {@link + * org.apache.giraph.debugger.Scenario.GiraphVertexScenario.VertexContext} + * protocol buffer. + * + * author semihsalihoglu + */ + public class VertexContextWrapper extends BaseWrapper { + /** + * Vertex master context wrapper instance. + */ + private CommonVertexMasterContextWrapper commonVertexMasterContextWrapper; + /** + * Reference to the vertex id. + */ + private I vertexIdWrapper; + /** + * Reference to the vertex value before the computation. + */ + private V vertexValueBeforeWrapper; + /** + * Reference to the vertex value after the computation. + */ + private V vertexValueAfterWrapper; + /** + * List of incoming messages. + */ + private ArrayList<M1> inMsgsWrapper; + /** + * List of neighbor vertices. + */ + private ArrayList<NeighborWrapper> neighborsWrapper; + /** + * List of outgoing messages. + */ + private ArrayList<OutgoingMessageWrapper> outMsgsWrapper; + + /** + * Default constructor. + */ + public VertexContextWrapper() { + reset(); + } + + /** + * Initializes/resets this instances. + */ + public void reset() { + this.commonVertexMasterContextWrapper = new + CommonVertexMasterContextWrapper(); + this.vertexIdWrapper = null; + this.vertexValueBeforeWrapper = null; + this.vertexValueAfterWrapper = null; + this.inMsgsWrapper = new ArrayList<M1>(); + this.neighborsWrapper = new ArrayList<NeighborWrapper>(); + this.outMsgsWrapper = new ArrayList<OutgoingMessageWrapper>(); + } + + public CommonVertexMasterContextWrapper + getCommonVertexMasterContextWrapper() { + return commonVertexMasterContextWrapper; + } + + public void setCommonVertexMasterContextWrapper( + CommonVertexMasterContextWrapper commonVertexMasterContextWrapper) { + this.commonVertexMasterContextWrapper = commonVertexMasterContextWrapper; + } + + public I getVertexIdWrapper() { + return vertexIdWrapper; + } + + public void setVertexIdWrapper(I vertexId) { + this.vertexIdWrapper = vertexId; + } + + public V getVertexValueBeforeWrapper() { + return vertexValueBeforeWrapper; + } + + public V getVertexValueAfterWrapper() { + return vertexValueAfterWrapper; + } + + public void setVertexValueBeforeWrapper(V vertexValueBefore) { + // Because Giraph does not create new objects for writables, we need + // to make a clone them to get a copy of the objects. Otherwise, if + // we call setVertexValueBeforeWrapper and then setVertexValueAfterWrapper + // both of our copies end up pointing to the same object (in this case to + // the value passed to setVertexValueAfterWrapper, because it was called + // later). + this.vertexValueBeforeWrapper = DebuggerUtils.makeCloneOf( + vertexValueBefore, getVertexScenarioClassesWrapper().vertexValueClass); + } + + public void setVertexValueAfterWrapper(V vertexValueAfter) { + // See the explanation for making a clone inside + // setVertexValueBeforeWrapper + this.vertexValueAfterWrapper = DebuggerUtils.makeCloneOf( + vertexValueAfter, getVertexScenarioClassesWrapper().vertexValueClass); + } + + /** + * Captures an incoming message by keeping a clone. + * + * @param message The message to capture. + */ + public void addIncomingMessageWrapper(M1 message) { + // See the explanation for making a clone inside + // setVertexValueBeforeWrapper + inMsgsWrapper.add(DebuggerUtils.makeCloneOf(message, + getVertexScenarioClassesWrapper().incomingMessageClass)); + } + + public Collection<M1> getIncomingMessageWrappers() { + return inMsgsWrapper; + } + + /** + * Captures an outgoing message by keeping a clone. + * + * @param receiverId The vertex id that receives the message. + * @param message The message being sent to be captured. + */ + public void addOutgoingMessageWrapper(I receiverId, M2 message) { + // See the explanation for making a clone inside + // setVertexValueBeforeWrapper + outMsgsWrapper.add(new OutgoingMessageWrapper(DebuggerUtils.makeCloneOf( + receiverId, getVertexScenarioClassesWrapper().vertexIdClass), + DebuggerUtils.makeCloneOf(message, + getVertexScenarioClassesWrapper().outgoingMessageClass))); + } + + public Collection<OutgoingMessageWrapper> getOutgoingMessageWrappers() { + return outMsgsWrapper; + } + + /** + * Adds a neighbor vertex. + * + * @param neighborId The neighbor vertex id. + * @param edgeValue The value of the edge that connects to the neighbor. + */ + public void addNeighborWrapper(I neighborId, E edgeValue) { + // See the explanation for making a clone inside + // setVertexValueBeforeWrapper + neighborsWrapper.add(new NeighborWrapper(DebuggerUtils.makeCloneOf( + neighborId, getVertexScenarioClassesWrapper().vertexIdClass), + DebuggerUtils.makeCloneOf(edgeValue, + getVertexScenarioClassesWrapper().edgeValueClass))); + } + + public Collection<NeighborWrapper> getNeighborWrappers() { + return neighborsWrapper; + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(commonVertexMasterContextWrapper.toString()); + stringBuilder.append("\nvertexId: " + getVertexIdWrapper()); + stringBuilder.append("\nvertexValueBefore: " + + getVertexValueBeforeWrapper()); + stringBuilder.append("\nvertexValueAfter: " + + getVertexValueAfterWrapper()); + stringBuilder.append("\nnumNeighbors: " + getNeighborWrappers().size()); + + for (NeighborWrapper neighborWrapper : getNeighborWrappers()) { + stringBuilder.append("\n" + neighborWrapper.toString()); + } + + for (M1 incomingMesage : getIncomingMessageWrappers()) { + stringBuilder.append("\nincoming message: " + incomingMesage); + } + + stringBuilder.append("\nnumOutgoingMessages: " + + getOutgoingMessageWrappers().size()); + for (OutgoingMessageWrapper outgoingMessageWrapper : + getOutgoingMessageWrappers()) { + stringBuilder.append("\n" + outgoingMessageWrapper); + } + return stringBuilder.toString(); + } + + /** + * Wrapper around scenario.giraphscenerio.neighbor (in scenario.proto). + * + * author Brian Truong + */ + public class NeighborWrapper extends BaseWrapper { + + /** + * Neighbor vertex id. + */ + private I nbrId; + /** + * Value of the edge that points to the neighbor. + */ + private E edgeValue; + + /** + * Constructor with the fields. + * + * @param nbrId Neighbor vertex id. + * @param edgeValue Value of the edge that points to the neighbor. + */ + public NeighborWrapper(I nbrId, E edgeValue) { + this.nbrId = nbrId; + this.edgeValue = edgeValue; + } + + /** + * Default constructor. + */ + public NeighborWrapper() { + } + + public I getNbrId() { + return nbrId; + } + + public E getEdgeValue() { + return edgeValue; + } + + @Override + public String toString() { + return "neighbor: nbrId: " + nbrId + " edgeValue: " + edgeValue; + } + + @Override + public GeneratedMessage buildProtoObject() { + Neighbor.Builder neighborBuilder = Neighbor.newBuilder(); + neighborBuilder.setNeighborId(toByteString(nbrId)); + if (edgeValue != null) { + neighborBuilder.setEdgeValue(toByteString(edgeValue)); + } else { + neighborBuilder.clearEdgeValue(); + } + return neighborBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return Neighbor.parseFrom(inputStream); + } + + @Override + public void loadFromProto(GeneratedMessage protoObject) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + Neighbor neighbor = (Neighbor) protoObject; + this.nbrId = DebuggerUtils + .newInstance(vertexScenarioClassesWrapper.vertexIdClass); + fromByteString(neighbor.getNeighborId(), this.nbrId); + + if (neighbor.hasEdgeValue()) { + this.edgeValue = DebuggerUtils + .newInstance(vertexScenarioClassesWrapper.edgeValueClass); + fromByteString(neighbor.getEdgeValue(), this.edgeValue); + } else { + this.edgeValue = null; + } + } + } + + /** + * Class for capturing outgoing message. + */ + public class OutgoingMessageWrapper extends BaseWrapper { + /** + * Destination vertex id. + */ + private I destinationId; + /** + * Outgoing message. + */ + private M2 message; + + /** + * Constructor with the field values. + * + * @param destinationId Destination vertex id. + * @param message Outgoing message. + */ + public OutgoingMessageWrapper(I destinationId, M2 message) { + this.setDestinationId(destinationId); + this.setMessage(message); + } + + /** + * Default constructor. + */ + public OutgoingMessageWrapper() { + } + + public I getDestinationId() { + return destinationId; + } + + public M2 getMessage() { + return message; + } + + @Override + public String toString() { + return "outgoingMessage: destinationId: " + getDestinationId() + + " message: " + getMessage(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + (getDestinationId() == null ? 0 : getDestinationId().hashCode()); + result = prime * result + (getMessage() == null ? 0 : + getMessage().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + @SuppressWarnings("unchecked") + OutgoingMessageWrapper other = (OutgoingMessageWrapper) obj; + if (getDestinationId() == null) { + if (other.getDestinationId() != null) { + return false; + } + } else if (!getDestinationId().equals(other.getDestinationId())) { + return false; + } + if (getMessage() == null) { + if (other.getMessage() != null) { + return false; + } + } else if (!getMessage().equals(other.getMessage())) { + return false; + } + return true; + } + + @Override + public GeneratedMessage buildProtoObject() { + OutgoingMessage.Builder outgoingMessageBuilder = OutgoingMessage + .newBuilder(); + outgoingMessageBuilder.setMsgData(toByteString(this.getMessage())); + outgoingMessageBuilder + .setDestinationId(toByteString(this.getDestinationId())); + return outgoingMessageBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return OutgoingMessage.parseFrom(inputStream); + } + + @Override + public void loadFromProto(GeneratedMessage generatedMessage) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + OutgoingMessage outgoingMessageProto = (OutgoingMessage) + generatedMessage; + this.setDestinationId(DebuggerUtils + .newInstance(getVertexScenarioClassesWrapper().vertexIdClass)); + fromByteString(outgoingMessageProto.getDestinationId(), + getDestinationId()); + this.setMessage(DebuggerUtils + .newInstance(getVertexScenarioClassesWrapper().outgoingMessageClass)); + fromByteString(outgoingMessageProto.getMsgData(), this.getMessage()); + } + + /** + * @param destinationId the destinationId to set + */ + public void setDestinationId(I destinationId) { + this.destinationId = destinationId; + } + + /** + * @param message the message to set + */ + public void setMessage(M2 message) { + this.message = message; + } + } + + @Override + public GeneratedMessage buildProtoObject() { + VertexContext.Builder contextBuilder = VertexContext.newBuilder(); + contextBuilder + .setCommonContext((CommonVertexMasterContext) + commonVertexMasterContextWrapper.buildProtoObject()); + contextBuilder.setVertexId(toByteString(vertexIdWrapper)); + if (vertexValueBeforeWrapper != null) { + contextBuilder + .setVertexValueBefore(toByteString(vertexValueBeforeWrapper)); + } + if (vertexValueAfterWrapper != null) { + contextBuilder + .setVertexValueAfter(toByteString(vertexValueAfterWrapper)); + } + + for (GiraphVertexScenarioWrapper<I, V, E, M1, M2>.VertexContextWrapper. + NeighborWrapper neighborWrapper : neighborsWrapper) { + contextBuilder.addNeighbor((Neighbor) neighborWrapper + .buildProtoObject()); + } + + for (M1 msg : inMsgsWrapper) { + contextBuilder.addInMessage(toByteString(msg)); + } + + for (OutgoingMessageWrapper outgoingMessageWrapper : outMsgsWrapper) { + contextBuilder.addOutMessage((OutgoingMessage) outgoingMessageWrapper + .buildProtoObject()); + } + + return contextBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return VertexContext.parseFrom(inputStream); + } + + @Override + public void loadFromProto(GeneratedMessage generatedMessage) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + VertexContext context = (VertexContext) generatedMessage; + + CommonVertexMasterContextWrapper vertexMasterContextWrapper = new + CommonVertexMasterContextWrapper(); + vertexMasterContextWrapper + .loadFromProto(context.getCommonContext()); + this.commonVertexMasterContextWrapper = vertexMasterContextWrapper; + + I vertexId = DebuggerUtils + .newInstance(getVertexScenarioClassesWrapper().vertexIdClass); + fromByteString(context.getVertexId(), vertexId); + this.vertexIdWrapper = vertexId; + + V vertexValueBefore = DebuggerUtils + .newInstance(getVertexScenarioClassesWrapper().vertexValueClass); + fromByteString(context.getVertexValueBefore(), vertexValueBefore); + this.vertexValueBeforeWrapper = vertexValueBefore; + if (context.hasVertexValueAfter()) { + V vertexValueAfter = DebuggerUtils + .newInstance(getVertexScenarioClassesWrapper().vertexValueClass); + fromByteString(context.getVertexValueAfter(), vertexValueAfter); + this.vertexValueAfterWrapper = vertexValueAfter; + } + + for (Neighbor neighbor : context.getNeighborList()) { + NeighborWrapper neighborWrapper = new NeighborWrapper(); + neighborWrapper.loadFromProto(neighbor); + this.neighborsWrapper.add(neighborWrapper); + } + for (int i = 0; i < context.getInMessageCount(); i++) { + M1 msg = DebuggerUtils + .newInstance(getVertexScenarioClassesWrapper().incomingMessageClass); + fromByteString(context.getInMessage(i), msg); + this.addIncomingMessageWrapper(msg); + } + + for (OutgoingMessage outgoingMessageProto : context.getOutMessageList()) { + OutgoingMessageWrapper outgoingMessageWrapper = new + OutgoingMessageWrapper(); + outgoingMessageWrapper.loadFromProto(outgoingMessageProto); + this.outMsgsWrapper.add(outgoingMessageWrapper); + } + } + } + + /** + * Class for capturing the parameter classes used for Giraph Computation. + */ + public class VertexScenarioClassesWrapper extends + BaseScenarioAndIntegrityWrapper<I> { + /** + * The Computation class. + */ + private Class<?> classUnderTest; + /** + * The vertex value class. + */ + private Class<V> vertexValueClass; + /** + * The edge value class. + */ + private Class<E> edgeValueClass; + /** + * The incoming message class. + */ + private Class<M1> incomingMessageClass; + /** + * The outgoing message class. + */ + private Class<M2> outgoingMessageClass; + + /** + * Default constructor. + */ + public VertexScenarioClassesWrapper() { + } + + /** + * Constructor with field values. + * + * @param classUnderTest Computation class. + * @param vertexIdClass Vertex id class. + * @param vertexValueClass Vertex value class. + * @param edgeValueClass Edge value class. + * @param incomingMessageClass Incoming message class. + * @param outgoingMessageClass Outgoing message class. + */ + public VertexScenarioClassesWrapper( + Class<? extends Computation<I, V, E, M1, M2>> classUnderTest, + Class<I> vertexIdClass, Class<V> vertexValueClass, + Class<E> edgeValueClass, Class<M1> incomingMessageClass, + Class<M2> outgoingMessageClass) { + super(vertexIdClass); + this.classUnderTest = classUnderTest; + this.vertexValueClass = vertexValueClass; + this.edgeValueClass = edgeValueClass; + this.incomingMessageClass = incomingMessageClass; + this.outgoingMessageClass = outgoingMessageClass; + } + + public Class<?> getClassUnderTest() { + return classUnderTest; + } + + public Class<V> getVertexValueClass() { + return vertexValueClass; + } + + public Class<E> getEdgeValueClass() { + return edgeValueClass; + } + + public Class<M1> getIncomingMessageClass() { + return incomingMessageClass; + } + + public Class<M2> getOutgoingMessageClass() { + return outgoingMessageClass; + } + + @Override + public GeneratedMessage buildProtoObject() { + VertexScenarioClasses.Builder vertexScenarioClassesBuilder = + VertexScenarioClasses.newBuilder(); + vertexScenarioClassesBuilder.setClassUnderTest(getClassUnderTest() + .getName()); + vertexScenarioClassesBuilder.setVertexIdClass(getVertexIdClass() + .getName()); + vertexScenarioClassesBuilder.setVertexValueClass(getVertexValueClass() + .getName()); + vertexScenarioClassesBuilder.setEdgeValueClass(getEdgeValueClass() + .getName()); + vertexScenarioClassesBuilder + .setIncomingMessageClass(getIncomingMessageClass().getName()); + vertexScenarioClassesBuilder + .setOutgoingMessageClass(getOutgoingMessageClass().getName()); + return vertexScenarioClassesBuilder.build(); + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return VertexScenarioClasses.parseFrom(inputStream); + } + + @SuppressWarnings("unchecked") + @Override + public void loadFromProto(GeneratedMessage generatedMessage) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + VertexScenarioClasses vertexScenarioClass = (VertexScenarioClasses) + generatedMessage; + Class<?> clazz = Class.forName(vertexScenarioClass.getClassUnderTest()); + this.classUnderTest = castClassToUpperBound(clazz, Computation.class); + this.vertexIdClass = (Class<I>) castClassToUpperBound( + Class.forName(vertexScenarioClass.getVertexIdClass()), + WritableComparable.class); + this.vertexValueClass = (Class<V>) castClassToUpperBound( + Class.forName(vertexScenarioClass.getVertexValueClass()), + Writable.class); + this.edgeValueClass = (Class<E>) castClassToUpperBound( + Class.forName(vertexScenarioClass.getEdgeValueClass()), Writable.class); + this.incomingMessageClass = (Class<M1>) castClassToUpperBound( + Class.forName(vertexScenarioClass.getIncomingMessageClass()), + Writable.class); + this.outgoingMessageClass = (Class<M2>) castClassToUpperBound( + Class.forName(vertexScenarioClass.getOutgoingMessageClass()), + Writable.class); + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(super.toString()); + stringBuilder.append("\nclassUnderTest: " + + getClassUnderTest().getCanonicalName()); + stringBuilder.append("\nvertexValueClass: " + + getVertexValueClass().getCanonicalName()); + stringBuilder.append("\nincomingMessageClass: " + + getIncomingMessageClass().getCanonicalName()); + stringBuilder.append("\noutgoingMessageClass: " + + getOutgoingMessageClass().getCanonicalName()); + return stringBuilder.toString(); + } + + } + + @Override + public void loadFromProto(GeneratedMessage generatedMessage) + throws ClassNotFoundException, IOException, InstantiationException, + IllegalAccessException { + GiraphVertexScenario giraphScenario = (GiraphVertexScenario) + generatedMessage; + this.vertexScenarioClassesWrapper = new VertexScenarioClassesWrapper(); + this.vertexScenarioClassesWrapper.loadFromProto(giraphScenario + .getVertexScenarioClasses()); + + this.contextWrapper = new VertexContextWrapper(); + this.contextWrapper.loadFromProto(giraphScenario.getContext()); + + if (giraphScenario.hasException()) { + this.exceptionWrapper = new ExceptionWrapper(); + this.exceptionWrapper.loadFromProto(giraphScenario.getException()); + } + } + + @Override + public GeneratedMessage buildProtoObject() { + GiraphVertexScenario.Builder giraphScenarioBuilder = GiraphVertexScenario + .newBuilder(); + giraphScenarioBuilder + .setVertexScenarioClasses((VertexScenarioClasses) + vertexScenarioClassesWrapper.buildProtoObject()); + giraphScenarioBuilder.setContext((VertexContext) contextWrapper + .buildProtoObject()); + if (hasExceptionWrapper()) { + giraphScenarioBuilder.setException((Exception) exceptionWrapper + .buildProtoObject()); + } + GiraphVertexScenario giraphScenario = giraphScenarioBuilder.build(); + return giraphScenario; + } + + @Override + public GeneratedMessage parseProtoFromInputStream(InputStream inputStream) + throws IOException { + return GiraphVertexScenario.parseFrom(inputStream); + } + + public VertexScenarioClassesWrapper getVertexScenarioClassesWrapper() { + return vertexScenarioClassesWrapper; + } + + public void setVertexScenarioClassesWrapper( + VertexScenarioClassesWrapper vertexScenarioClassesWrapper) { + this.vertexScenarioClassesWrapper = vertexScenarioClassesWrapper; + } +}
