Author: nitay
Date: Sat Nov 17 03:12:58 2012
New Revision: 1410684

URL: http://svn.apache.org/viewvc?rev=1410684&view=rev
Log:
GIRAPH-416: MasterObserver for user post-application customization (nitay)

Added:
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/master/
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java
    
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sat Nov 17 03:12:58 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-416: MasterObserver for user post-application customization (nitay)
+
   GIRAPH-427: Add committer information for Nitay Joffe to pom.xml (nitay)
 
   GIRAPH-417: Serialize the graph/message cache into byte[] for

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java 
Sat Nov 17 03:12:58 2012
@@ -28,9 +28,9 @@ import org.apache.giraph.graph.VertexOut
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.WorkerContext;
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
-
 import org.apache.giraph.graph.partition.Partition;
 
+import org.apache.giraph.master.MasterObserver;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -44,6 +44,9 @@ public class GiraphConfiguration extends
 
   /** Class for Master - optional */
   public static final String MASTER_COMPUTE_CLASS = 
"giraph.masterComputeClass";
+  /** Classes for Observer Master - optional */
+  public static final String MASTER_OBSERVER_CLASSES =
+      "giraph.master.observers";
   /** Vertex combiner class - optional */
   public static final String VERTEX_COMBINER_CLASS =
       "giraph.combinerClass";
@@ -695,6 +698,82 @@ public class GiraphConfiguration extends
   }
 
   /**
+   * Add a MasterObserver class (optional)
+   *
+   * @param masterObserverClass MasterObserver class to add.
+   */
+  public final void addMasterObserverClass(
+      Class<? extends MasterObserver> masterObserverClass) {
+    addToClasses(MASTER_OBSERVER_CLASSES, masterObserverClass,
+        MasterObserver.class);
+  }
+
+  /**
+   * Add a class to a property that is a list of classes. If the property does
+   * not exist it will be created.
+   *
+   * @param name String name of property.
+   * @param klass interface of the class being set.
+   * @param xface Class to add to the list.
+   */
+  public final void addToClasses(String name, Class<?> klass, Class<?> xface) {
+    if (!xface.isAssignableFrom(klass)) {
+      throw new RuntimeException(klass + " does not implement " +
+          xface.getName());
+    }
+    String value;
+    String klasses = get(name);
+    if (klasses == null) {
+      value = klass.getName();
+    } else {
+      value = klasses + "," + klass.getName();
+    }
+    set(name, value);
+  }
+
+  /**
+   * Set mapping from a key name to a list of classes.
+   *
+   * @param name String key name to use.
+   * @param xface interface of the classes being set.
+   * @param klasses Classes to set.
+   */
+  public final void setClasses(String name, Class<?> xface,
+                               Class<?> ... klasses) {
+    String[] klassNames = new String[klasses.length];
+    for (int i = 0; i < klasses.length; ++i) {
+      Class<?> klass = klasses[i];
+      if (!xface.isAssignableFrom(klass)) {
+        throw new RuntimeException(klass + " does not implement " +
+            xface.getName());
+      }
+      klassNames[i] = klasses[i].getName();
+    }
+    setStrings(name, klassNames);
+  }
+
+  /**
+   * Get classes from a property that all implement a given interface.
+   *
+   * @param name String name of property to fetch.
+   * @param xface interface classes must implement.
+   * @param defaultValue If not found, return this
+   * @param <T> Generic type of interface class
+   * @return array of Classes implementing interface specified.
+   */
+  public final <T> Class<? extends T>[] getClassesOfType(String name,
+      Class<T> xface, Class<? extends T> ... defaultValue) {
+    Class<?>[] klasses = getClasses(name, defaultValue);
+    for (Class<?> klass : klasses) {
+      if (!xface.isAssignableFrom(klass)) {
+        throw new RuntimeException(klass + " is not assignable from " +
+            xface.getName());
+      }
+    }
+    return (Class<? extends T>[]) klasses;
+  }
+
+  /**
    * Set the vertex output format class (optional)
    *
    * @param vertexOutputFormatClass Determines how graph is output
@@ -826,6 +905,15 @@ public class GiraphConfiguration extends
   }
 
   /**
+   * Get array of MasterObserver classes set in the configuration.
+   *
+   * @return array of MasterObserver classes.
+   */
+  public Class<? extends MasterObserver>[] getMasterObserverClasses() {
+    return getClassesOfType(MASTER_OBSERVER_CLASSES, MasterObserver.class);
+  }
+
+  /**
    * Should we dump metrics at the end of the job.
    *
    * @return true if we should dump metrics, false otherwise.

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
 Sat Nov 17 03:12:58 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph;
 
-import java.util.List;
 import org.apache.giraph.graph.AggregatorWriter;
 import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.DefaultMasterCompute;
@@ -38,6 +37,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.SimplePartition;
+import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 import org.apache.giraph.utils.ExtendedDataInput;
@@ -51,6 +51,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
+import java.util.List;
+
 /**
  * The classes set here are immutable, the remaining configuration is mutable.
  * Classes are immutable and final to provide the best performance for
@@ -457,6 +459,20 @@ public class ImmutableClassesGiraphConfi
   }
 
   /**
+   * Create array of MasterObservers.
+   *
+   * @return Instantiated array of MasterObservers.
+   */
+  public MasterObserver[] createMasterObservers() {
+    Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
+    MasterObserver[] objects = new MasterObserver[klasses.length];
+    for (int i = 0; i < klasses.length; ++i) {
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+    }
+    return objects;
+  }
+
+  /**
    * Get the user's subclassed edge value class.
    *
    * @return User's vertex edge value class

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
 Sat Nov 17 03:12:58 2012
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.bsp;
 
-import java.io.IOException;
-
 import org.apache.giraph.graph.MasterAggregatorHandler;
 import org.apache.giraph.graph.MasterInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.zookeeper.KeeperException;
 
+import java.io.IOException;
+
 /**
  * At most, there will be one active master at a time, but many threads can
  * be trying to be the active master.
@@ -122,4 +122,14 @@ public interface CentralizedServiceMaste
    * @return Master aggregator handler
    */
   MasterAggregatorHandler getAggregatorHandler();
+
+  /**
+   * Superstep has finished.
+   */
+  void postSuperstep();
+
+  /**
+   * Application has finished.
+   */
+  void postApplication();
 }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
Sat Nov 17 03:12:58 2012
@@ -34,6 +34,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricGroup;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
@@ -152,6 +153,8 @@ public class BspServiceMaster<I extends 
   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
   /** Limit locality information added to each InputSplit znode */
   private final int localityLimit = 5;
+  /** Observers over master lifecycle. */
+  private final MasterObserver[] observers;
 
   // Per-Superstep Metrics
   /** MasterCompute time in msec */
@@ -191,6 +194,7 @@ public class BspServiceMaster<I extends 
         GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
     masterGraphPartitioner =
         getGraphPartitionerFactory().createMasterGraphPartitioner();
+    observers = getConfiguration().createMasterObservers();
 
     GiraphMetrics.getInstance().addSuperstepResetObserver(this);
     GiraphStats.init(context);
@@ -735,6 +739,10 @@ public class BspServiceMaster<I extends 
       GiraphStats.getInstance().getSuperstepCounter().
         setValue(getRestartedSuperstep());
     }
+    for (MasterObserver observer : observers) {
+      observer.preApplication();
+      getContext().progress();
+    }
   }
 
   @Override
@@ -1341,6 +1349,11 @@ public class BspServiceMaster<I extends 
     // 5. Create superstep finished node
     // 6. If the checkpoint frequency is met, finalize the checkpoint
 
+    for (MasterObserver observer : observers) {
+      observer.preSuperstep();
+      getContext().progress();
+    }
+
     chosenWorkerInfoList = checkWorkers();
     if (chosenWorkerInfoList == null) {
       LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
@@ -1578,6 +1591,22 @@ public class BspServiceMaster<I extends 
   }
 
   @Override
+  public void postApplication() {
+    for (MasterObserver observer : observers) {
+      observer.postApplication();
+      getContext().progress();
+    }
+  }
+
+  @Override
+  public void postSuperstep() {
+    for (MasterObserver observer : observers) {
+      observer.postSuperstep();
+      getContext().progress();
+    }
+  }
+
+  @Override
   public void cleanup() throws IOException {
     // All master processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java?rev=1410684&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
 Sat Nov 17 03:12:58 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.ImmutableList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * A vertex with no value, edges, or messages. Just an ID, nothing more.
+ */
+public abstract class IntNullNullNullVertex extends Vertex<IntWritable,
+    NullWritable, NullWritable, NullWritable> {
+  @Override
+  public void setEdges(Map<IntWritable, NullWritable> edges) { }
+
+  @Override
+  public Iterable<Edge<IntWritable, NullWritable>> getEdges() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    getId().write(out);
+    out.writeBoolean(isHalted());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int id = in.readInt();
+    initialize(new IntWritable(id), NullWritable.get());
+    boolean halt = in.readBoolean();
+    if (halt) {
+      voteToHalt();
+    } else {
+      wakeUp();
+    }
+  }
+}

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java 
(original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java 
Sat Nov 17 03:12:58 2012
@@ -130,6 +130,8 @@ public class MasterThread<I extends Writ
                 increment(superstepMillis);
             }
 
+            bspServiceMaster.postSuperstep();
+
             // If a worker failed, restart from a known good superstep
             if (superstepState == SuperstepState.WORKER_FAILURE) {
               bspServiceMaster.restartFromCheckpoint(
@@ -184,5 +186,6 @@ public class MasterThread<I extends Writ
           "KeeperException", e);
       throw new IllegalStateException(e);
     }
+    bspServiceMaster.postApplication();
   }
 }

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java?rev=1410684&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/io/IntNullNullNullTextInputFormat.java
 Sat Nov 17 03:12:58 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs without edges or values, just vertices with ids.
+ *
+ * Each line is just simply the vertex id.
+ */
+public class IntNullNullNullTextInputFormat extends TextVertexInputFormat<
+    IntWritable, NullWritable, NullWritable, NullWritable> {
+  @Override
+  public TextVertexReader createVertexReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    return new IntNullNullNullVertexReader();
+  }
+
+  /**
+   * Reader for this InputFormat.
+   */
+  public class IntNullNullNullVertexReader extends
+      TextVertexReaderFromEachLineProcessed<String> {
+    /** Cached vertex id */
+    private IntWritable id;
+
+    @Override
+    protected String preprocessLine(Text line) throws IOException {
+      id = new IntWritable(Integer.parseInt(line.toString()));
+      return line.toString();
+    }
+
+    @Override
+    protected IntWritable getId(String line) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected NullWritable getValue(String line) throws IOException {
+      return NullWritable.get();
+    }
+
+    @Override
+    protected Map<IntWritable, NullWritable> getEdges(String line)
+      throws IOException {
+      return ImmutableMap.of();
+    }
+  }
+}

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java?rev=1410684&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
 Sat Nov 17 03:12:58 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+
+/**
+ * A no-op implementation of MasterObserver to make it easier for users.
+ */
+public class DefaultMasterObserver implements MasterObserver {
+  /** Configuration to use */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void preApplication() {
+  }
+
+  @Override
+  public void postApplication() {
+  }
+
+  @Override
+  public void preSuperstep() {
+  }
+
+  @Override
+  public void postSuperstep() {
+  }
+}

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java?rev=1410684&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java 
(added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/MasterObserver.java 
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+
+/**
+ * Observer for Master.
+ */
+public interface MasterObserver extends ImmutableClassesGiraphConfigurable {
+  /**
+   * Before application begins.
+   */
+  void preApplication();
+
+  /**
+   * After application ends.
+   */
+  void postApplication();
+
+  /**
+   * Before each superstep starts.
+   */
+  void preSuperstep();
+
+  /**
+   * After each superstep ends.
+   */
+  void postSuperstep();
+}

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java?rev=1410684&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java 
(added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/master/package-info.java 
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all the observer related things.
+ */
+package org.apache.giraph.master;

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1410684&r1=1410683&r2=1410684&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
 Sat Nov 17 03:12:58 2012
@@ -36,6 +36,7 @@ import org.apache.zookeeper.server.ZooKe
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
 
 import java.io.File;
@@ -177,6 +178,7 @@ public class InternalVertexRunner {
 
     boolean useVertexInputFormat = vertexInputFormatClass != null;
     boolean useEdgeInputFormat = edgeInputFormatClass != null;
+    boolean outputData = vertexOutputFormatClass != null;
 
     File tmpDir = null;
     try {
@@ -215,8 +217,10 @@ public class InternalVertexRunner {
       if (useEdgeInputFormat) {
         job.getConfiguration().setEdgeInputFormatClass(edgeInputFormatClass);
       }
-      job.getConfiguration().setVertexOutputFormatClass(
-          vertexOutputFormatClass);
+      if (outputData) {
+        job.getConfiguration().setVertexOutputFormatClass(
+            vertexOutputFormatClass);
+      }
       if (workerContextClass != null) {
         job.getConfiguration().setWorkerContextClass(workerContextClass);
       }
@@ -257,17 +261,7 @@ public class InternalVertexRunner {
                                      new Path(outputDir.toString()));
 
       // Configure a local zookeeper instance
-      Properties zkProperties = new Properties();
-      zkProperties.setProperty("tickTime", "2000");
-      zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
-      zkProperties.setProperty("clientPort",
-          String.valueOf(LOCAL_ZOOKEEPER_PORT));
-      zkProperties.setProperty("maxClientCnxns", "10000");
-      zkProperties.setProperty("minSessionTimeout", "10000");
-      zkProperties.setProperty("maxSessionTimeout", "100000");
-      zkProperties.setProperty("initLimit", "10");
-      zkProperties.setProperty("syncLimit", "5");
-      zkProperties.setProperty("snapCount", "50000");
+      Properties zkProperties = configLocalZooKeeper(zkDir);
 
       QuorumPeerConfig qpConfig = new QuorumPeerConfig();
       qpConfig.parseProperties(zkProperties);
@@ -295,8 +289,12 @@ public class InternalVertexRunner {
         zookeeper.end();
       }
 
-      return Files.readLines(new File(outputDir, "part-m-00000"),
-          Charsets.UTF_8);
+      if (outputData) {
+        return Files.readLines(new File(outputDir, "part-m-00000"),
+            Charsets.UTF_8);
+      } else {
+        return ImmutableList.of();
+      }
     } finally {
       FileUtils.delete(tmpDir);
     }
@@ -304,6 +302,27 @@ public class InternalVertexRunner {
   // CHECKSTYLE: resume ParameterNumberCheck
 
   /**
+   * Configuration options for running local ZK.
+   *
+   * @param zkDir directory for ZK to hold files in.
+   * @return Properties configured for local ZK.
+   */
+  private static Properties configLocalZooKeeper(File zkDir) {
+    Properties zkProperties = new Properties();
+    zkProperties.setProperty("tickTime", "2000");
+    zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+    zkProperties.setProperty("clientPort",
+        String.valueOf(LOCAL_ZOOKEEPER_PORT));
+    zkProperties.setProperty("maxClientCnxns", "10000");
+    zkProperties.setProperty("minSessionTimeout", "10000");
+    zkProperties.setProperty("maxSessionTimeout", "100000");
+    zkProperties.setProperty("initLimit", "10");
+    zkProperties.setProperty("syncLimit", "5");
+    zkProperties.setProperty("snapCount", "50000");
+    return zkProperties;
+  }
+
+  /**
    * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
    */
   private static class InternalZooKeeper extends ZooKeeperServerMain {

Added: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java?rev=1410684&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
 (added)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestGiraphConfiguration.java
 Sat Nov 17 03:12:58 2012
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestGiraphConfiguration {
+  public interface If { }
+  public class A implements If { }
+  public class B implements If { }
+  public class C implements If { }
+
+  @Test
+  public void testSetClasses() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setClasses("foo", If.class, A.class, B.class);
+    Class<?>[] klasses = conf.getClasses("foo");
+    assertEquals(2, klasses.length);
+    assertEquals(A.class, klasses[0]);
+    assertEquals(B.class, klasses[1]);
+
+    try {
+      conf.setClasses("foo", A.class, B.class);
+      fail();
+    } catch (RuntimeException e) {
+      assertEquals(2, conf.getClasses("foo").length);
+    }
+
+    Class<? extends If>[] klasses2 = conf.getClassesOfType("foo", If.class);
+    assertEquals(2, klasses2.length);
+    assertEquals(A.class, klasses2[0]);
+    assertEquals(B.class, klasses2[1]);
+  }
+
+  @Test
+  public void testAddToClasses() {
+    GiraphConfiguration conf = new GiraphConfiguration();
+
+    conf.setClasses("foo", If.class, A.class, B.class);
+    conf.addToClasses("foo", C.class, If.class);
+    Class<?>[] klasses = conf.getClasses("foo");
+    assertEquals(3, klasses.length);
+    assertEquals(A.class, klasses[0]);
+    assertEquals(B.class, klasses[1]);
+    assertEquals(C.class, klasses[2]);
+
+    conf.addToClasses("bar", B.class, If.class);
+    klasses = conf.getClasses("bar");
+    assertEquals(1, klasses.length);
+    assertEquals(B.class, klasses[0]);
+  }
+}

Added: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java?rev=1410684&view=auto
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java 
(added)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestMasterObserver.java 
Sat Nov 17 03:12:58 2012
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.graph.IntNullNullNullVertex;
+import org.apache.giraph.io.IntNullNullNullTextInputFormat;
+import org.apache.giraph.master.DefaultMasterObserver;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestMasterObserver {
+  public static class NoOpVertex extends IntNullNullNullVertex {
+    private int count = 0;
+
+    @Override
+    public void compute(Iterable<NullWritable> messages) throws IOException {
+      if (count == 2) {
+        voteToHalt();
+      }
+      ++count;
+    }
+  }
+
+  public static class Obs extends DefaultMasterObserver {
+    public static int preApp = 0;
+    public static int preSuperstep = 0;
+    public static int postSuperstep = 0;
+    public static int postApp = 0;
+
+    @Override
+    public void preApplication() {
+      ++preApp;
+    }
+
+    @Override
+    public void postApplication() {
+      ++postApp;
+    }
+
+    @Override
+    public void preSuperstep() {
+      ++preSuperstep;
+    }
+
+    @Override
+    public void postSuperstep() {
+      ++postSuperstep;
+    }
+  }
+
+  @Test
+  public void testGetsCalled() throws Exception {
+    assertEquals(0, Obs.postApp);
+
+    String[] graph = new String[] { "1", "2", "3" };
+
+    Map<String, String> params = Maps.newHashMap();
+    String klasses[] = new String[] {
+        Obs.class.getName(),
+        Obs.class.getName()
+    };
+    params.put(GiraphConfiguration.MASTER_OBSERVER_CLASSES,
+        StringUtils.arrayToString(klasses));
+
+    InternalVertexRunner.run(NoOpVertex.class,
+        IntNullNullNullTextInputFormat.class, null, params, graph);
+
+    assertEquals(2, Obs.preApp);
+    // 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
+    assertEquals(8, Obs.preSuperstep);
+    assertEquals(8, Obs.postSuperstep);
+    assertEquals(2, Obs.postApp);
+  }
+}


Reply via email to