Author: maja
Date: Fri Nov 30 19:50:06 2012
New Revision: 1415806

URL: http://svn.apache.org/viewvc?rev=1415806&view=rev
Log:
GIRAPH-437: Missing progress calls when stopping Netty server

Modified:
    giraph/trunk/CHANGELOG
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
    
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Nov 30 19:50:06 2012
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-437: Missing progress calls when stopping Netty server (majakabiljo)
+
   GIRAPH-439: Fix naming of input superstep counter (apresta)
 
   GIRAPH-424: Fix hashCode modulo computation (majakabiljo)

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
 Fri Nov 30 19:50:06 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.ImmutableClasse
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
 import org.apache.giraph.comm.MasterServer;
+import org.apache.hadoop.util.Progressable;
 
 import java.net.InetSocketAddress;
 
@@ -37,12 +38,14 @@ public class NettyMasterServer implement
    *
    * @param conf Hadoop configuration
    * @param service Centralized service
+   * @param progressable Progressable for reporting progress
    */
   public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
-      CentralizedServiceMaster<?, ?, ?, ?> service) {
+      CentralizedServiceMaster<?, ?, ?, ?> service,
+      Progressable progressable) {
     nettyServer = new NettyServer(conf,
         new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
-        service.getMasterInfo());
+        service.getMasterInfo(), progressable);
     nettyServer.start();
   }
 

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java 
Fri Nov 30 19:50:06 2012
@@ -40,6 +40,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
@@ -80,6 +82,8 @@ else[HADOOP_NON_SECURE]*/
   private static final Logger LOG = Logger.getLogger(NettyServer.class);
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration conf;
+  /** Progressable for reporting progress */
+  private final Progressable progressable;
   /** Factory of channels */
   private ChannelFactory channelFactory;
   /** Accepted channels */
@@ -126,11 +130,13 @@ else[HADOOP_NON_SECURE]*/
    * @param conf Configuration to use
    * @param requestServerHandlerFactory Factory for request handlers
    * @param myTaskInfo Current task info
+   * @param progressable Progressable for reporting progress
    */
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
       RequestServerHandler.Factory requestServerHandlerFactory,
-      TaskInfo myTaskInfo) {
+      TaskInfo myTaskInfo, Progressable progressable) {
     this.conf = conf;
+    this.progressable = progressable;
     this.requestServerHandlerFactory = requestServerHandlerFactory;
     /*if[HADOOP_NON_SECURE]
     else[HADOOP_NON_SECURE]*/
@@ -203,13 +209,15 @@ else[HADOOP_NON_SECURE]*/
    * @param conf Configuration to use
    * @param requestServerHandlerFactory Factory for request handlers
    * @param myTaskInfo Current task info
+   * @param progressable Progressable for reporting progress
    * @param saslServerHandlerFactory  Factory for SASL handlers
    */
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
                      RequestServerHandler.Factory requestServerHandlerFactory,
                      TaskInfo myTaskInfo,
+                     Progressable progressable,
                      SaslServerHandler.Factory saslServerHandlerFactory) {
-    this(conf, requestServerHandlerFactory, myTaskInfo);
+    this(conf, requestServerHandlerFactory, myTaskInfo, progressable);
     this.saslServerHandlerFactory = saslServerHandlerFactory;
   }
 /*end[HADOOP_NON_SECURE]*/
@@ -353,11 +361,14 @@ else[HADOOP_NON_SECURE]*/
     if (LOG.isInfoEnabled()) {
       LOG.info("stop: Halting netty server");
     }
-    accepted.close().awaitUninterruptibly();
+    ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
     bossExecutorService.shutdownNow();
     workerExecutorService.shutdownNow();
     bootstrap.releaseExternalResources();
     channelFactory.releaseExternalResources();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("stop: Netty server halted");
+    }
   }
 
   public InetSocketAddress getMyAddress() {

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
 Fri Nov 30 19:50:06 2012
@@ -91,7 +91,7 @@ public class NettyWorkerServer<I extends
 
     nettyServer = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
-        service.getWorkerInfo());
+        service.getWorkerInfo(), context);
     nettyServer.start();
   }
 

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 
Fri Nov 30 19:50:06 2012
@@ -802,7 +802,8 @@ public class BspServiceMaster<I extends 
           aggregatorHandler.initialize(this);
 
           masterInfo = new MasterInfo();
-          masterServer = new NettyMasterServer(getConfiguration(), this);
+          masterServer =
+              new NettyMasterServer(getConfiguration(), this, getContext());
           masterInfo.setInetSocketAddress(masterServer.getMyAddress());
           masterClient =
               new NettyMasterClient(getContext(), getConfiguration(), this);

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
 Fri Nov 30 19:50:06 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.utils;
 
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -36,125 +37,260 @@ public class ProgressableUtils {
   private static final int MSEC_PERIOD = 60 * 1000;
 
   /** Do not instantiate. */
-  private ProgressableUtils() { }
+  private ProgressableUtils() {
+  }
 
   /**
    * Wait for executor tasks to terminate, while periodically reporting
    * progress.
    *
-   * @param executor Executor which we are waiting for
+   * @param executor     Executor which we are waiting for
    * @param progressable Progressable for reporting progress (Job context)
    */
   public static void awaitExecutorTermination(ExecutorService executor,
       Progressable progressable) {
-    while (!awaitExecutorTermination(executor, progressable, MSEC_PERIOD)) {
-      progressable.progress();
+    waitForever(new ExecutorServiceWaitable(executor), progressable);
+  }
+
+  /**
+   * Wait for the result of the future to be ready, while periodically
+   * reporting progress.
+   *
+   * @param <T>          Type of the return value of the future
+   * @param future       Future
+   * @param progressable Progressable for reporting progress (Job context)
+   * @return Computed result of the future.
+   */
+  public static <T> T getFutureResult(Future<T> future,
+      Progressable progressable) {
+    return waitForever(new FutureWaitable<T>(future), progressable);
+  }
+
+  /**
+   * Wait for {@link ChannelGroupFuture} to finish, while periodically
+   * reporting progress.
+   *
+   * @param future       ChannelGroupFuture
+   * @param progressable Progressable for reporting progress (Job context)
+   */
+  public static void awaitChannelGroupFuture(ChannelGroupFuture future,
+      Progressable progressable) {
+    waitForever(new ChannelGroupFutureWaitable(future), progressable);
+  }
+
+  /**
+   * Wait forever for waitable to finish. Periodically reports progress.
+   *
+   * @param waitable Waitable which we wait for
+   * @param progressable Progressable for reporting progress (Job context)
+   * @param <T> Result type
+   * @return Result of waitable
+   */
+  private static <T> T waitForever(Waitable<T> waitable,
+      Progressable progressable) {
+    while (true) {
+      waitFor(waitable, progressable, MSEC_PERIOD);
+      if (waitable.isFinished()) {
+        try {
+          return waitable.getResult();
+        } catch (ExecutionException e) {
+          throw new IllegalStateException("waitForever: " +
+              "ExecutionException occurred while waiting for " + waitable, e);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("waitForever: " +
+              "InterruptedException occurred while waiting for " + waitable, 
e);
+        }
+      }
     }
   }
 
   /**
-   * Wait maximum given number of milliseconds for executor tasks to terminate,
-   * while periodically reporting progress.
+   *  Wait for desired number of milliseconds for waitable to finish.
+   *  Periodically reports progress.
    *
-   * @param executor Executor which we are waiting for
+   * @param waitable Waitable which we wait for
    * @param progressable Progressable for reporting progress (Job context)
-   * @param remainingWaitMsecs Number of milliseconds to wait
-   * @return Whether all executor tasks terminated or not
+   * @param msecs Number of milliseconds to wait for
+   * @param <T> Result type
+   * @return Result of waitable
    */
-  public static boolean awaitExecutorTermination(ExecutorService executor,
-      Progressable progressable, int remainingWaitMsecs) {
-    long timeoutTimeMsecs = System.currentTimeMillis() + remainingWaitMsecs;
+  private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
+      int msecs) {
+    long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
     int currentWaitMsecs;
     while (true) {
-      currentWaitMsecs = Math.min(remainingWaitMsecs, MSEC_PERIOD);
+      currentWaitMsecs = Math.min(msecs, MSEC_PERIOD);
       try {
-        if (executor.awaitTermination(currentWaitMsecs,
-            TimeUnit.MILLISECONDS)) {
-          return true;
+        waitable.waitFor(currentWaitMsecs);
+        if (waitable.isFinished()) {
+          return waitable.getResult();
         }
       } catch (InterruptedException e) {
-        throw new IllegalStateException("awaitExecutorTermination: " +
-            "InterruptedException occurred while waiting for executor's " +
-            "tasks to terminate", e);
+        throw new IllegalStateException("waitFor: " +
+            "InterruptedException occurred while waiting for " + waitable, e);
+      } catch (ExecutionException e) {
+        throw new IllegalStateException("waitFor: " +
+            "ExecutionException occurred while waiting for " + waitable, e);
+      } catch (TimeoutException e) {
+        throw new IllegalStateException("waitFor: " +
+            "TimeoutException occurred while waiting for " + waitable, e);
       }
       if (LOG.isInfoEnabled()) {
-        LOG.info("awaitExecutorTermination: " +
-            "Waiting for executor tasks to terminate " + executor.toString());
+        LOG.info("waitFor: Waiting for " + waitable);
       }
       if (System.currentTimeMillis() >= timeoutTimeMsecs) {
-        return false;
+        return waitable.getTimeoutResult();
       }
       progressable.progress();
-      remainingWaitMsecs = Math.max(0, remainingWaitMsecs - currentWaitMsecs);
+      msecs = Math.max(0, msecs - currentWaitMsecs);
     }
   }
 
+  /**
+   * Interface for waiting on a result from some operation.
+   *
+   * @param <T> Result type.
+   */
+  private interface Waitable<T> {
+    /**
+     * Wait for desired number of milliseconds for waitable to finish.
+     *
+     * @param msecs Number of milliseconds to wait.
+     */
+    void waitFor(int msecs) throws InterruptedException, ExecutionException,
+        TimeoutException;
+
+    /**
+     * Check if waitable is finished.
+     *
+     * @return True iff waitable finished.
+     */
+    boolean isFinished();
+
+    /**
+     * Get result of waitable. Call after isFinished() returns true.
+     *
+     * @return Result of waitable.
+     */
+    T getResult() throws ExecutionException, InterruptedException;
+
+    /**
+     * Get the result which we want to return in case of timeout.
+     *
+     * @return Timeout result.
+     */
+    T getTimeoutResult();
+  }
 
   /**
-   * Wait for the result of the future to be ready, while periodically
-   * reporting progress.
+   * abstract class for waitables which don't have the result.
+   */
+  private abstract static class WaitableWithoutResult
+      implements Waitable<Void> {
+    @Override
+    public Void getResult() throws ExecutionException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public Void getTimeoutResult() {
+      return null;
+    }
+  }
+
+  /**
+   * {@link Waitable} for waiting on a result of a {@link Future}.
    *
-   * @param <T> Type of the return value of the future
-   * @param future Future
-   * @param progressable Progressable for reporting progress (Job context)
-   * @return Computed result of the future.
+   * @param <T> Future result type
    */
-  public static <T> T getFutureResult(Future<T> future,
-                                      Progressable progressable) {
-    while (!future.isDone()) {
-      tryGetFutureResult(future, progressable, MSEC_PERIOD);
+  private static class FutureWaitable<T> implements Waitable<T> {
+    /** Future which we want to wait for */
+    private final Future<T> future;
+
+    /**
+     * Constructor
+     *
+     * @param future Future which we want to wait for
+     */
+    public FutureWaitable(Future<T> future) {
+      this.future = future;
+    }
+
+    @Override
+    public void waitFor(int msecs) throws InterruptedException,
+        ExecutionException, TimeoutException {
+      future.get(msecs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean isFinished() {
+      return future.isDone();
     }
 
-    try {
+    @Override
+    public T getResult() throws ExecutionException, InterruptedException {
       return future.get();
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("get: " +
-          "InterruptedException occurred while waiting for future result", e);
-    } catch (ExecutionException e) {
-      throw new IllegalStateException("get: " +
-          "ExecutionException occurred while waiting for future result", e);
+    }
+
+    @Override
+    public T getTimeoutResult() {
+      return null;
     }
   }
 
   /**
-   * Wait maximum given number of milliseconds for result to become available,
-   * while periodically reporting progress.
-   *
-   * @param <T> Type of the return value of the future
-   * @param future Future
-   * @param progressable Progressable for reporting progress (Job context)
-   * @param msecs Number of milliseconds to wait
-   * @return Future result
+   * {@link Waitable} for waiting on an {@link ExecutorService} to terminate.
    */
-  public static <T> T tryGetFutureResult(
-      Future<T> future, Progressable progressable, int msecs) {
-    long maxMsecs = System.currentTimeMillis() + msecs;
-    int curMsecTimeout;
-    while (true) {
-      curMsecTimeout = Math.min(msecs, MSEC_PERIOD);
-      try {
-        future.get(curMsecTimeout, TimeUnit.MILLISECONDS);
-        if (future.isDone()) {
-          return future.get();
-        }
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("tryGet: " +
-            "InterruptedException occurred while waiting for future result",
-            e);
-      } catch (ExecutionException e) {
-        throw new IllegalStateException("tryGet: " +
-            "ExecutionException occurred while waiting for future result", e);
-      } catch (TimeoutException e) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("tryGetFutureResult: Timeout occurred");
-        }
-      }
+  private static class ExecutorServiceWaitable extends WaitableWithoutResult {
+    /** ExecutorService which we want to wait for */
+    private final ExecutorService executorService;
+
+    /**
+     * Constructor
+     *
+     * @param executorService ExecutorService which we want to wait for
+     */
+    public ExecutorServiceWaitable(ExecutorService executorService) {
+      this.executorService = executorService;
+    }
 
-      progressable.progress();
-      if (System.currentTimeMillis() >= maxMsecs) {
-        return null;
-      }
-      msecs = Math.max(0, msecs - curMsecTimeout);
+    @Override
+    public void waitFor(int msecs) throws InterruptedException {
+      executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean isFinished() {
+      return executorService.isTerminated();
+    }
+  }
+
+  /**
+   * {@link Waitable} for waiting on a {@link ChannelGroupFutureWaitable} to
+   * terminate.
+   */
+  private static class ChannelGroupFutureWaitable extends
+      WaitableWithoutResult {
+    /** ChannelGroupFuture which we want to wait for */
+    private final ChannelGroupFuture future;
+
+    /**
+     * Constructor
+     *
+     * @param future ChannelGroupFuture which we want to wait for
+     */
+    public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
+      this.future = future;
+    }
+
+    @Override
+    public void waitFor(int msecs) throws InterruptedException {
+      future.await(msecs, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean isFinished() {
+      return future.isDone();
     }
   }
 }

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java 
(original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java 
Fri Nov 30 19:50:06 2012
@@ -77,7 +77,8 @@ public class ConnectionTest {
     WorkerInfo workerInfo = new WorkerInfo(-1);
     NettyServer server =
         new NettyServer(conf,
-            new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+            new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
 
@@ -107,19 +108,21 @@ public class ConnectionTest {
 
     WorkerInfo workerInfo1 = new WorkerInfo(1);
     NettyServer server1 =
-        new NettyServer(conf, requestServerHandlerFactory, workerInfo1);
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo1, 
context);
     server1.start();
     workerInfo1.setInetSocketAddress(server1.getMyAddress());
 
     WorkerInfo workerInfo2 = new WorkerInfo(2);
     NettyServer server2 =
-        new NettyServer(conf, requestServerHandlerFactory, workerInfo2);
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
+            context);
     server2.start();
     workerInfo2.setInetSocketAddress(server2.getMyAddress());
 
     WorkerInfo workerInfo3 = new WorkerInfo(3);
     NettyServer server3 =
-        new NettyServer(conf, requestServerHandlerFactory, workerInfo3);
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
+            context);
     server3.start();
     workerInfo3.setInetSocketAddress(server3.getMyAddress());
 
@@ -149,7 +152,8 @@ public class ConnectionTest {
         MockUtils.createNewServerData(conf, context);
     WorkerInfo workerInfo = new WorkerInfo(-1);
     NettyServer server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
 

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
 (original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
 Fri Nov 30 19:50:06 2012
@@ -161,7 +161,8 @@ public class RequestFailureTest {
     serverData = MockUtils.createNewServerData(conf, context);
     WorkerInfo workerInfo = new WorkerInfo(-1);
     server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
     client = new NettyClient(context, conf, new WorkerInfo());

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java 
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java 
Fri Nov 30 19:50:06 2012
@@ -94,7 +94,8 @@ public class RequestTest {
     serverData = MockUtils.createNewServerData(conf, context);
     workerInfo = new WorkerInfo(-1);
     server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+            context);
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
     client = new NettyClient(context, conf, new WorkerInfo());

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
 (original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
 Fri Nov 30 19:50:06 2012
@@ -88,6 +88,7 @@ public class SaslConnectionTest {
         new NettyServer(conf,
             new WorkerRequestServerHandler.Factory(serverData),
             workerInfo,
+            context,
             mockedSaslServerFactory);
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());


Reply via email to