Author: nitay
Date: Sat Nov 17 03:12:58 2012
New Revision: 1410684
URL: http://svn.apache.org/viewvc?rev=1410684&view=rev
Log:
GIRAPH-416: MasterObserver for user post-application customization (nitay)
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sat Nov 17 03:12:58 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-416: MasterObserver for user post-application customization (nitay)
+
GIRAPH-427: Add committer information for Nitay Joffe to pom.xml (nitay)
GIRAPH-417: Serialize the graph/message cache into byte[] for
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
Sat Nov 17 03:12:58 2012
@@ -28,9 +28,9 @@ import org.apache.giraph.graph.VertexOut
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.WorkerContext;
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
-
import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.master.MasterObserver;
import org.apache.hadoop.conf.Configuration;
/**
@@ -44,6 +44,9 @@ public class GiraphConfiguration extends
/** Class for Master - optional */
public static final String MASTER_COMPUTE_CLASS =
"giraph.masterComputeClass";
+ /** Classes for Observer Master - optional */
+ public static final String MASTER_OBSERVER_CLASSES =
+ "giraph.master.observers";
/** Vertex combiner class - optional */
public static final String VERTEX_COMBINER_CLASS =
"giraph.combinerClass";
@@ -695,6 +698,82 @@ public class GiraphConfiguration extends
}
/**
+ * Add a MasterObserver class (optional)
+ *
+ * @param masterObserverClass MasterObserver class to add.
+ */
+ public final void addMasterObserverClass(
+ Class<? extends MasterObserver> masterObserverClass) {
+ addToClasses(MASTER_OBSERVER_CLASSES, masterObserverClass,
+ MasterObserver.class);
+ }
+
+ /**
+ * Add a class to a property that is a list of classes. If the property does
+ * not exist it will be created.
+ *
+ * @param name String name of property.
+ * @param klass interface of the class being set.
+ * @param xface Class to add to the list.
+ */
+ public final void addToClasses(String name, Class<?> klass, Class<?> xface) {
+ if (!xface.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " does not implement " +
+ xface.getName());
+ }
+ String value;
+ String klasses = get(name);
+ if (klasses == null) {
+ value = klass.getName();
+ } else {
+ value = klasses + "," + klass.getName();
+ }
+ set(name, value);
+ }
+
+ /**
+ * Set mapping from a key name to a list of classes.
+ *
+ * @param name String key name to use.
+ * @param xface interface of the classes being set.
+ * @param klasses Classes to set.
+ */
+ public final void setClasses(String name, Class<?> xface,
+ Class<?> ... klasses) {
+ String[] klassNames = new String[klasses.length];
+ for (int i = 0; i < klasses.length; ++i) {
+ Class<?> klass = klasses[i];
+ if (!xface.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " does not implement " +
+ xface.getName());
+ }
+ klassNames[i] = klasses[i].getName();
+ }
+ setStrings(name, klassNames);
+ }
+
+ /**
+ * Get classes from a property that all implement a given interface.
+ *
+ * @param name String name of property to fetch.
+ * @param xface interface classes must implement.
+ * @param defaultValue If not found, return this
+ * @param <T> Generic type of interface class
+ * @return array of Classes implementing interface specified.
+ */
+ public final <T> Class<? extends T>[] getClassesOfType(String name,
+ Class<T> xface, Class<? extends T> ... defaultValue) {
+ Class<?>[] klasses = getClasses(name, defaultValue);
+ for (Class<?> klass : klasses) {
+ if (!xface.isAssignableFrom(klass)) {
+ throw new RuntimeException(klass + " is not assignable from " +
+ xface.getName());
+ }
+ }
+ return (Class<? extends T>[]) klasses;
+ }
+
+ /**
* Set the vertex output format class (optional)
*
* @param vertexOutputFormatClass Determines how graph is output
@@ -826,6 +905,15 @@ public class GiraphConfiguration extends
}
/**
+ * Get array of MasterObserver classes set in the configuration.
+ *
+ * @return array of MasterObserver classes.
+ */
+ public Class<? extends MasterObserver>[] getMasterObserverClasses() {
+ return getClassesOfType(MASTER_OBSERVER_CLASSES, MasterObserver.class);
+ }
+
+ /**
* Should we dump metrics at the end of the job.
*
* @return true if we should dump metrics, false otherwise.
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
Sat Nov 17 03:12:58 2012
@@ -18,7 +18,6 @@
package org.apache.giraph;
-import java.util.List;
import org.apache.giraph.graph.AggregatorWriter;
import org.apache.giraph.graph.Combiner;
import org.apache.giraph.graph.DefaultMasterCompute;
@@ -38,6 +37,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.SimplePartition;
+import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.utils.ExtendedByteArrayDataInput;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataInput;
@@ -51,6 +51,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
+import java.util.List;
+
/**
* The classes set here are immutable, the remaining configuration is mutable.
* Classes are immutable and final to provide the best performance for
@@ -457,6 +459,20 @@ public class ImmutableClassesGiraphConfi
}
/**
+ * Create array of MasterObservers.
+ *
+ * @return Instantiated array of MasterObservers.
+ */
+ public MasterObserver[] createMasterObservers() {
+ Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
+ MasterObserver[] objects = new MasterObserver[klasses.length];
+ for (int i = 0; i < klasses.length; ++i) {
+ objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+ }
+ return objects;
+ }
+
+ /**
* Get the user's subclassed edge value class.
*
* @return User's vertex edge value class
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
Sat Nov 17 03:12:58 2012
@@ -18,14 +18,14 @@
package org.apache.giraph.bsp;
-import java.io.IOException;
-
import org.apache.giraph.graph.MasterAggregatorHandler;
import org.apache.giraph.graph.MasterInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.zookeeper.KeeperException;
+import java.io.IOException;
+
/**
* At most, there will be one active master at a time, but many threads can
* be trying to be the active master.
@@ -122,4 +122,14 @@ public interface CentralizedServiceMaste
* @return Master aggregator handler
*/
MasterAggregatorHandler getAggregatorHandler();
+
+ /**
+ * Superstep has finished.
+ */
+ void postSuperstep();
+
+ /**
+ * Application has finished.
+ */
+ void postApplication();
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Sat Nov 17 03:12:58 2012
@@ -34,6 +34,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.MetricGroup;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
@@ -152,6 +153,8 @@ public class BspServiceMaster<I extends
private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
/** Limit locality information added to each InputSplit znode */
private final int localityLimit = 5;
+ /** Observers over master lifecycle. */
+ private final MasterObserver[] observers;
// Per-Superstep Metrics
/** MasterCompute time in msec */
@@ -191,6 +194,7 @@ public class BspServiceMaster<I extends
GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
masterGraphPartitioner =
getGraphPartitionerFactory().createMasterGraphPartitioner();
+ observers = getConfiguration().createMasterObservers();
GiraphMetrics.getInstance().addSuperstepResetObserver(this);
GiraphStats.init(context);
@@ -735,6 +739,10 @@ public class BspServiceMaster<I extends
GiraphStats.getInstance().getSuperstepCounter().
setValue(getRestartedSuperstep());
}
+ for (MasterObserver observer : observers) {
+ observer.preApplication();
+ getContext().progress();
+ }
}
@Override
@@ -1341,6 +1349,11 @@ public class BspServiceMaster<I extends
// 5. Create superstep finished node
// 6. If the checkpoint frequency is met, finalize the checkpoint
+ for (MasterObserver observer : observers) {
+ observer.preSuperstep();
+ getContext().progress();
+ }
+
chosenWorkerInfoList = checkWorkers();
if (chosenWorkerInfoList == null) {
LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
@@ -1578,6 +1591,22 @@ public class BspServiceMaster<I extends
}
@Override
+ public void postApplication() {
+ for (MasterObserver observer : observers) {
+ observer.postApplication();
+ getContext().progress();
+ }
+ }
+
+ @Override
+ public void postSuperstep() {
+ for (MasterObserver observer : observers) {
+ observer.postSuperstep();
+ getContext().progress();
+ }
+ }
+
+ @Override
public void cleanup() throws IOException {
// All master processes should denote they are done by adding special
// znode. Once the number of znodes equals the number of partitions
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java?rev=1410684&view=auto
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
(added)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * A vertex with no value, edges, or messages. Just an ID, nothing more.
+ */
+public abstract class IntNullNullNullVertex extends Vertex<IntWritable,
+ NullWritable, NullWritable, NullWritable> {
+ @Override
+ public void setEdges(Map<IntWritable, NullWritable> edges) { }
+
+ @Override
+ public Iterable<Edge<IntWritable, NullWritable>> getEdges() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ getId().write(out);
+ out.writeBoolean(isHalted());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int id = in.readInt();
+ initialize(new IntWritable(id), NullWritable.get());
+ boolean halt = in.readBoolean();
+ if (halt) {
+ voteToHalt();
+ } else {
+ wakeUp();
+ }
+ }
+}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
Sat Nov 17 03:12:58 2012
@@ -130,6 +130,8 @@ public class MasterThread<I extends Writ
increment(superstepMillis);
}
+ bspServiceMaster.postSuperstep();
+
// If a worker failed, restart from a known good superstep
if (superstepState == SuperstepState.WORKER_FAILURE) {
bspServiceMaster.restartFromCheckpoint(
@@ -184,5 +186,6 @@ public class MasterThread<I extends Writ
"KeeperException", e);
throw new IllegalStateException(e);
}
+ bspServiceMaster.postApplication();
}
}
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java?rev=1410684&view=auto
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
(added)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs without edges or values, just vertices with ids.
+ *
+ * Each line is just simply the vertex id.
+ */
+public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
+ IntWritable, NullWritable, NullWritable, NullWritable> {
+ @Override
+ public TextVertexReader createVertexReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new IntNullNullNullVertexReader();
+ }
+
+ /**
+ * Reader for this InputFormat.
+ */
+ public class IntNullNullNullVertexReader extends
+ TextVertexReaderFromEachLineProcessed<String> {
+ /** Cached vertex id */
+ private IntWritable id;
+
+ @Override
+ protected String preprocessLine(Text line) throws IOException {
+ id = new IntWritable(Integer.parseInt(line.toString()));
+ return line.toString();
+ }
+
+ @Override
+ protected IntWritable getId(String line) throws IOException {
+ return id;
+ }
+
+ @Override
+ protected NullWritable getValue(String line) throws IOException {
+ return NullWritable.get();
+ }
+
+ @Override
+ protected Map<IntWritable, NullWritable> getEdges(String line)
+ throws IOException {
+ return ImmutableMap.of();
+ }
+ }
+}
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java?rev=1410684&view=auto
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
(added)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+
+/**
+ * A no-op implementation of MasterObserver to make it easier for users.
+ */
+public class DefaultMasterObserver implements MasterObserver {
+ /** Configuration to use */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ this.conf = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void preApplication() {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+
+ @Override
+ public void preSuperstep() {
+ }
+
+ @Override
+ public void postSuperstep() {
+ }
+}
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java?rev=1410684&view=auto
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java
(added)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+
+/**
+ * Observer for Master.
+ */
+public interface MasterObserver extends ImmutableClassesGiraphConfigurable {
+ /**
+ * Before application begins.
+ */
+ void preApplication();
+
+ /**
+ * After application ends.
+ */
+ void postApplication();
+
+ /**
+ * Before each superstep starts.
+ */
+ void preSuperstep();
+
+ /**
+ * After each superstep ends.
+ */
+ void postSuperstep();
+}
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java?rev=1410684&view=auto
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java
(added)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all the observer related things.
+ */
+package org.apache.giraph.master;
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
Sat Nov 17 03:12:58 2012
@@ -36,6 +36,7 @@ import org.apache.zookeeper.server.ZooKe
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import java.io.File;
@@ -177,6 +178,7 @@ public class InternalVertexRunner {
boolean useVertexInputFormat = vertexInputFormatClass != null;
boolean useEdgeInputFormat = edgeInputFormatClass != null;
+ boolean outputData = vertexOutputFormatClass != null;
File tmpDir = null;
try {
@@ -215,8 +217,10 @@ public class InternalVertexRunner {
if (useEdgeInputFormat) {
job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
}
- job.getConfiguration().setVertexOutputFormatClass(
- vertexOutputFormatClass);
+ if (outputData) {
+ job.getConfiguration().setVertexOutputFormatClass(
+ vertexOutputFormatClass);
+ }
if (workerContextClass != null) {
job.getConfiguration().setWorkerContextClass(workerContextClass);
}
@@ -257,17 +261,7 @@ public class InternalVertexRunner {
new Path(outputDir.toString()));
// Configure a local zookeeper instance
- Properties zkProperties = new Properties();
- zkProperties.setProperty("tickTime", "2000");
- zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
- zkProperties.setProperty("clientPort",
- String.valueOf(LOCAL_ZOOKEEPER_PORT));
- zkProperties.setProperty("maxClientCnxns", "10000");
- zkProperties.setProperty("minSessionTimeout", "10000");
- zkProperties.setProperty("maxSessionTimeout", "100000");
- zkProperties.setProperty("initLimit", "10");
- zkProperties.setProperty("syncLimit", "5");
- zkProperties.setProperty("snapCount", "50000");
+ Properties zkProperties = configLocalZooKeeper(zkDir);
QuorumPeerConfig qpConfig = new QuorumPeerConfig();
qpConfig.parseProperties(zkProperties);
@@ -295,8 +289,12 @@ public class InternalVertexRunner {
zookeeper.end();
}
- return Files.readLines(new File(outputDir, "part-m-00000"),
- Charsets.UTF_8);
+ if (outputData) {
+ return Files.readLines(new File(outputDir, "part-m-00000"),
+ Charsets.UTF_8);
+ } else {
+ return ImmutableList.of();
+ }
} finally {
FileUtils.delete(tmpDir);
}
@@ -304,6 +302,27 @@ public class InternalVertexRunner {
// CHECKSTYLE: resume ParameterNumberCheck
/**
+ * Configuration options for running local ZK.
+ *
+ * @param zkDir directory for ZK to hold files in.
+ * @return Properties configured for local ZK.
+ */
+ private static Properties configLocalZooKeeper(File zkDir) {
+ Properties zkProperties = new Properties();
+ zkProperties.setProperty("tickTime", "2000");
+ zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+ zkProperties.setProperty("clientPort",
+ String.valueOf(LOCAL_ZOOKEEPER_PORT));
+ zkProperties.setProperty("maxClientCnxns", "10000");
+ zkProperties.setProperty("minSessionTimeout", "10000");
+ zkProperties.setProperty("maxSessionTimeout", "100000");
+ zkProperties.setProperty("initLimit", "10");
+ zkProperties.setProperty("syncLimit", "5");
+ zkProperties.setProperty("snapCount", "50000");
+ return zkProperties;
+ }
+
+ /**
* Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
*/
private static class InternalZooKeeper extends ZooKeeperServerMain {
Added:
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java?rev=1410684&view=auto
==============================================================================
---
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
(added)
+++
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestGiraphConfiguration {
+ public interface If { }
+ public class A implements If { }
+ public class B implements If { }
+ public class C implements If { }
+
+ @Test
+ public void testSetClasses() {
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setClasses("foo", If.class, A.class, B.class);
+ Class<?>[] klasses = conf.getClasses("foo");
+ assertEquals(2, klasses.length);
+ assertEquals(A.class, klasses[0]);
+ assertEquals(B.class, klasses[1]);
+
+ try {
+ conf.setClasses("foo", A.class, B.class);
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals(2, conf.getClasses("foo").length);
+ }
+
+ Class<? extends If>[] klasses2 = conf.getClassesOfType("foo", If.class);
+ assertEquals(2, klasses2.length);
+ assertEquals(A.class, klasses2[0]);
+ assertEquals(B.class, klasses2[1]);
+ }
+
+ @Test
+ public void testAddToClasses() {
+ GiraphConfiguration conf = new GiraphConfiguration();
+
+ conf.setClasses("foo", If.class, A.class, B.class);
+ conf.addToClasses("foo", C.class, If.class);
+ Class<?>[] klasses = conf.getClasses("foo");
+ assertEquals(3, klasses.length);
+ assertEquals(A.class, klasses[0]);
+ assertEquals(B.class, klasses[1]);
+ assertEquals(C.class, klasses[2]);
+
+ conf.addToClasses("bar", B.class, If.class);
+ klasses = conf.getClasses("bar");
+ assertEquals(1, klasses.length);
+ assertEquals(B.class, klasses[0]);
+ }
+}
Added:
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java?rev=1410684&view=auto
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java
(added)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.graph.IntNullNullNullVertex;
+import org.apache.giraph.io.IntNullNullNullTextInputFormat;
+import org.apache.giraph.master.DefaultMasterObserver;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestMasterObserver {
+ public static class NoOpVertex extends IntNullNullNullVertex {
+ private int count = 0;
+
+ @Override
+ public void compute(Iterable<NullWritable> messages) throws IOException {
+ if (count == 2) {
+ voteToHalt();
+ }
+ ++count;
+ }
+ }
+
+ public static class Obs extends DefaultMasterObserver {
+ public static int preApp = 0;
+ public static int preSuperstep = 0;
+ public static int postSuperstep = 0;
+ public static int postApp = 0;
+
+ @Override
+ public void preApplication() {
+ ++preApp;
+ }
+
+ @Override
+ public void postApplication() {
+ ++postApp;
+ }
+
+ @Override
+ public void preSuperstep() {
+ ++preSuperstep;
+ }
+
+ @Override
+ public void postSuperstep() {
+ ++postSuperstep;
+ }
+ }
+
+ @Test
+ public void testGetsCalled() throws Exception {
+ assertEquals(0, Obs.postApp);
+
+ String[] graph = new String[] { "1", "2", "3" };
+
+ Map<String, String> params = Maps.newHashMap();
+ String klasses[] = new String[] {
+ Obs.class.getName(),
+ Obs.class.getName()
+ };
+ params.put(GiraphConfiguration.MASTER_OBSERVER_CLASSES,
+ StringUtils.arrayToString(klasses));
+
+ InternalVertexRunner.run(NoOpVertex.class,
+ IntNullNullNullTextInputFormat.class, null, params, graph);
+
+ assertEquals(2, Obs.preApp);
+ // 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
+ assertEquals(8, Obs.preSuperstep);
+ assertEquals(8, Obs.postSuperstep);
+ assertEquals(2, Obs.postApp);
+ }
+}