Author: maja
Date: Fri Nov 30 19:50:06 2012
New Revision: 1415806
URL: http://svn.apache.org/viewvc?rev=1415806&view=rev
Log:
GIRAPH-437: Missing progress calls when stopping Netty server
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Nov 30 19:50:06 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-437: Missing progress calls when stopping Netty server (majakabiljo)
+
GIRAPH-439: Fix naming of input superstep counter (apresta)
GIRAPH-424: Fix hashCode modulo computation (majakabiljo)
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
Fri Nov 30 19:50:06 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.ImmutableClasse
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
import org.apache.giraph.comm.MasterServer;
+import org.apache.hadoop.util.Progressable;
import java.net.InetSocketAddress;
@@ -37,12 +38,14 @@ public class NettyMasterServer implement
*
* @param conf Hadoop configuration
* @param service Centralized service
+ * @param progressable Progressable for reporting progress
*/
public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
- CentralizedServiceMaster<?, ?, ?, ?> service) {
+ CentralizedServiceMaster<?, ?, ?, ?> service,
+ Progressable progressable) {
nettyServer = new NettyServer(conf,
new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
- service.getMasterInfo());
+ service.getMasterInfo(), progressable);
nettyServer.start();
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
Fri Nov 30 19:50:06 2012
@@ -40,6 +40,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@@ -80,6 +82,8 @@ else[HADOOP_NON_SECURE]*/
private static final Logger LOG = Logger.getLogger(NettyServer.class);
/** Configuration */
private final ImmutableClassesGiraphConfiguration conf;
+ /** Progressable for reporting progress */
+ private final Progressable progressable;
/** Factory of channels */
private ChannelFactory channelFactory;
/** Accepted channels */
@@ -126,11 +130,13 @@ else[HADOOP_NON_SECURE]*/
* @param conf Configuration to use
* @param requestServerHandlerFactory Factory for request handlers
* @param myTaskInfo Current task info
+ * @param progressable Progressable for reporting progress
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory,
- TaskInfo myTaskInfo) {
+ TaskInfo myTaskInfo, Progressable progressable) {
this.conf = conf;
+ this.progressable = progressable;
this.requestServerHandlerFactory = requestServerHandlerFactory;
/*if[HADOOP_NON_SECURE]
else[HADOOP_NON_SECURE]*/
@@ -203,13 +209,15 @@ else[HADOOP_NON_SECURE]*/
* @param conf Configuration to use
* @param requestServerHandlerFactory Factory for request handlers
* @param myTaskInfo Current task info
+ * @param progressable Progressable for reporting progress
* @param saslServerHandlerFactory Factory for SASL handlers
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory,
TaskInfo myTaskInfo,
+ Progressable progressable,
SaslServerHandler.Factory saslServerHandlerFactory) {
- this(conf, requestServerHandlerFactory, myTaskInfo);
+ this(conf, requestServerHandlerFactory, myTaskInfo, progressable);
this.saslServerHandlerFactory = saslServerHandlerFactory;
}
/*end[HADOOP_NON_SECURE]*/
@@ -353,11 +361,14 @@ else[HADOOP_NON_SECURE]*/
if (LOG.isInfoEnabled()) {
LOG.info("stop: Halting netty server");
}
- accepted.close().awaitUninterruptibly();
+ ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
bossExecutorService.shutdownNow();
workerExecutorService.shutdownNow();
bootstrap.releaseExternalResources();
channelFactory.releaseExternalResources();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("stop: Netty server halted");
+ }
}
public InetSocketAddress getMyAddress() {
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
Fri Nov 30 19:50:06 2012
@@ -91,7 +91,7 @@ public class NettyWorkerServer<I extends
nettyServer = new NettyServer(conf,
new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
- service.getWorkerInfo());
+ service.getWorkerInfo(), context);
nettyServer.start();
}
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
Fri Nov 30 19:50:06 2012
@@ -802,7 +802,8 @@ public class BspServiceMaster<I extends
aggregatorHandler.initialize(this);
masterInfo = new MasterInfo();
- masterServer = new NettyMasterServer(getConfiguration(), this);
+ masterServer =
+ new NettyMasterServer(getConfiguration(), this, getContext());
masterInfo.setInetSocketAddress(masterServer.getMyAddress());
masterClient =
new NettyMasterClient(getContext(), getConfiguration(), this);
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
Fri Nov 30 19:50:06 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.utils;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -36,125 +37,260 @@ public class ProgressableUtils {
private static final int MSEC_PERIOD = 60 * 1000;
/** Do not instantiate. */
- private ProgressableUtils() { }
+ private ProgressableUtils() {
+ }
/**
* Wait for executor tasks to terminate, while periodically reporting
* progress.
*
- * @param executor Executor which we are waiting for
+ * @param executor Executor which we are waiting for
* @param progressable Progressable for reporting progress (Job context)
*/
public static void awaitExecutorTermination(ExecutorService executor,
Progressable progressable) {
- while (!awaitExecutorTermination(executor, progressable, MSEC_PERIOD)) {
- progressable.progress();
+ waitForever(new ExecutorServiceWaitable(executor), progressable);
+ }
+
+ /**
+ * Wait for the result of the future to be ready, while periodically
+ * reporting progress.
+ *
+ * @param <T> Type of the return value of the future
+ * @param future Future
+ * @param progressable Progressable for reporting progress (Job context)
+ * @return Computed result of the future.
+ */
+ public static <T> T getFutureResult(Future<T> future,
+ Progressable progressable) {
+ return waitForever(new FutureWaitable<T>(future), progressable);
+ }
+
+ /**
+ * Wait for {@link ChannelGroupFuture} to finish, while periodically
+ * reporting progress.
+ *
+ * @param future ChannelGroupFuture
+ * @param progressable Progressable for reporting progress (Job context)
+ */
+ public static void awaitChannelGroupFuture(ChannelGroupFuture future,
+ Progressable progressable) {
+ waitForever(new ChannelGroupFutureWaitable(future), progressable);
+ }
+
+ /**
+ * Wait forever for waitable to finish. Periodically reports progress.
+ *
+ * @param waitable Waitable which we wait for
+ * @param progressable Progressable for reporting progress (Job context)
+ * @param <T> Result type
+ * @return Result of waitable
+ */
+ private static <T> T waitForever(Waitable<T> waitable,
+ Progressable progressable) {
+ while (true) {
+ waitFor(waitable, progressable, MSEC_PERIOD);
+ if (waitable.isFinished()) {
+ try {
+ return waitable.getResult();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("waitForever: " +
+ "ExecutionException occurred while waiting for " + waitable, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("waitForever: " +
+ "InterruptedException occurred while waiting for " + waitable,
e);
+ }
+ }
}
}
/**
- * Wait maximum given number of milliseconds for executor tasks to terminate,
- * while periodically reporting progress.
+ * Wait for desired number of milliseconds for waitable to finish.
+ * Periodically reports progress.
*
- * @param executor Executor which we are waiting for
+ * @param waitable Waitable which we wait for
* @param progressable Progressable for reporting progress (Job context)
- * @param remainingWaitMsecs Number of milliseconds to wait
- * @return Whether all executor tasks terminated or not
+ * @param msecs Number of milliseconds to wait for
+ * @param <T> Result type
+ * @return Result of waitable
*/
- public static boolean awaitExecutorTermination(ExecutorService executor,
- Progressable progressable, int remainingWaitMsecs) {
- long timeoutTimeMsecs = System.currentTimeMillis() + remainingWaitMsecs;
+ private static <T> T waitFor(Waitable<T> waitable, Progressable progressable,
+ int msecs) {
+ long timeoutTimeMsecs = System.currentTimeMillis() + msecs;
int currentWaitMsecs;
while (true) {
- currentWaitMsecs = Math.min(remainingWaitMsecs, MSEC_PERIOD);
+ currentWaitMsecs = Math.min(msecs, MSEC_PERIOD);
try {
- if (executor.awaitTermination(currentWaitMsecs,
- TimeUnit.MILLISECONDS)) {
- return true;
+ waitable.waitFor(currentWaitMsecs);
+ if (waitable.isFinished()) {
+ return waitable.getResult();
}
} catch (InterruptedException e) {
- throw new IllegalStateException("awaitExecutorTermination: " +
- "InterruptedException occurred while waiting for executor's " +
- "tasks to terminate", e);
+ throw new IllegalStateException("waitFor: " +
+ "InterruptedException occurred while waiting for " + waitable, e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("waitFor: " +
+ "ExecutionException occurred while waiting for " + waitable, e);
+ } catch (TimeoutException e) {
+ throw new IllegalStateException("waitFor: " +
+ "TimeoutException occurred while waiting for " + waitable, e);
}
if (LOG.isInfoEnabled()) {
- LOG.info("awaitExecutorTermination: " +
- "Waiting for executor tasks to terminate " + executor.toString());
+ LOG.info("waitFor: Waiting for " + waitable);
}
if (System.currentTimeMillis() >= timeoutTimeMsecs) {
- return false;
+ return waitable.getTimeoutResult();
}
progressable.progress();
- remainingWaitMsecs = Math.max(0, remainingWaitMsecs - currentWaitMsecs);
+ msecs = Math.max(0, msecs - currentWaitMsecs);
}
}
+ /**
+ * Interface for waiting on a result from some operation.
+ *
+ * @param <T> Result type.
+ */
+ private interface Waitable<T> {
+ /**
+ * Wait for desired number of milliseconds for waitable to finish.
+ *
+ * @param msecs Number of milliseconds to wait.
+ */
+ void waitFor(int msecs) throws InterruptedException, ExecutionException,
+ TimeoutException;
+
+ /**
+ * Check if waitable is finished.
+ *
+ * @return True iff waitable finished.
+ */
+ boolean isFinished();
+
+ /**
+ * Get result of waitable. Call after isFinished() returns true.
+ *
+ * @return Result of waitable.
+ */
+ T getResult() throws ExecutionException, InterruptedException;
+
+ /**
+ * Get the result which we want to return in case of timeout.
+ *
+ * @return Timeout result.
+ */
+ T getTimeoutResult();
+ }
/**
- * Wait for the result of the future to be ready, while periodically
- * reporting progress.
+ * abstract class for waitables which don't have the result.
+ */
+ private abstract static class WaitableWithoutResult
+ implements Waitable<Void> {
+ @Override
+ public Void getResult() throws ExecutionException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public Void getTimeoutResult() {
+ return null;
+ }
+ }
+
+ /**
+ * {@link Waitable} for waiting on a result of a {@link Future}.
*
- * @param <T> Type of the return value of the future
- * @param future Future
- * @param progressable Progressable for reporting progress (Job context)
- * @return Computed result of the future.
+ * @param <T> Future result type
*/
- public static <T> T getFutureResult(Future<T> future,
- Progressable progressable) {
- while (!future.isDone()) {
- tryGetFutureResult(future, progressable, MSEC_PERIOD);
+ private static class FutureWaitable<T> implements Waitable<T> {
+ /** Future which we want to wait for */
+ private final Future<T> future;
+
+ /**
+ * Constructor
+ *
+ * @param future Future which we want to wait for
+ */
+ public FutureWaitable(Future<T> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void waitFor(int msecs) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ future.get(msecs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return future.isDone();
}
- try {
+ @Override
+ public T getResult() throws ExecutionException, InterruptedException {
return future.get();
- } catch (InterruptedException e) {
- throw new IllegalStateException("get: " +
- "InterruptedException occurred while waiting for future result", e);
- } catch (ExecutionException e) {
- throw new IllegalStateException("get: " +
- "ExecutionException occurred while waiting for future result", e);
+ }
+
+ @Override
+ public T getTimeoutResult() {
+ return null;
}
}
/**
- * Wait maximum given number of milliseconds for result to become available,
- * while periodically reporting progress.
- *
- * @param <T> Type of the return value of the future
- * @param future Future
- * @param progressable Progressable for reporting progress (Job context)
- * @param msecs Number of milliseconds to wait
- * @return Future result
+ * {@link Waitable} for waiting on an {@link ExecutorService} to terminate.
*/
- public static <T> T tryGetFutureResult(
- Future<T> future, Progressable progressable, int msecs) {
- long maxMsecs = System.currentTimeMillis() + msecs;
- int curMsecTimeout;
- while (true) {
- curMsecTimeout = Math.min(msecs, MSEC_PERIOD);
- try {
- future.get(curMsecTimeout, TimeUnit.MILLISECONDS);
- if (future.isDone()) {
- return future.get();
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException("tryGet: " +
- "InterruptedException occurred while waiting for future result",
- e);
- } catch (ExecutionException e) {
- throw new IllegalStateException("tryGet: " +
- "ExecutionException occurred while waiting for future result", e);
- } catch (TimeoutException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("tryGetFutureResult: Timeout occurred");
- }
- }
+ private static class ExecutorServiceWaitable extends WaitableWithoutResult {
+ /** ExecutorService which we want to wait for */
+ private final ExecutorService executorService;
+
+ /**
+ * Constructor
+ *
+ * @param executorService ExecutorService which we want to wait for
+ */
+ public ExecutorServiceWaitable(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
- progressable.progress();
- if (System.currentTimeMillis() >= maxMsecs) {
- return null;
- }
- msecs = Math.max(0, msecs - curMsecTimeout);
+ @Override
+ public void waitFor(int msecs) throws InterruptedException {
+ executorService.awaitTermination(msecs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return executorService.isTerminated();
+ }
+ }
+
+ /**
+ * {@link Waitable} for waiting on a {@link ChannelGroupFutureWaitable} to
+ * terminate.
+ */
+ private static class ChannelGroupFutureWaitable extends
+ WaitableWithoutResult {
+ /** ChannelGroupFuture which we want to wait for */
+ private final ChannelGroupFuture future;
+
+ /**
+ * Constructor
+ *
+ * @param future ChannelGroupFuture which we want to wait for
+ */
+ public ChannelGroupFutureWaitable(ChannelGroupFuture future) {
+ this.future = future;
+ }
+
+ @Override
+ public void waitFor(int msecs) throws InterruptedException {
+ future.await(msecs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return future.isDone();
}
}
}
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
(original)
+++
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
Fri Nov 30 19:50:06 2012
@@ -77,7 +77,8 @@ public class ConnectionTest {
WorkerInfo workerInfo = new WorkerInfo(-1);
NettyServer server =
new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+ context);
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
@@ -107,19 +108,21 @@ public class ConnectionTest {
WorkerInfo workerInfo1 = new WorkerInfo(1);
NettyServer server1 =
- new NettyServer(conf, requestServerHandlerFactory, workerInfo1);
+ new NettyServer(conf, requestServerHandlerFactory, workerInfo1,
context);
server1.start();
workerInfo1.setInetSocketAddress(server1.getMyAddress());
WorkerInfo workerInfo2 = new WorkerInfo(2);
NettyServer server2 =
- new NettyServer(conf, requestServerHandlerFactory, workerInfo2);
+ new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
+ context);
server2.start();
workerInfo2.setInetSocketAddress(server2.getMyAddress());
WorkerInfo workerInfo3 = new WorkerInfo(3);
NettyServer server3 =
- new NettyServer(conf, requestServerHandlerFactory, workerInfo3);
+ new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
+ context);
server3.start();
workerInfo3.setInetSocketAddress(server3.getMyAddress());
@@ -149,7 +152,8 @@ public class ConnectionTest {
MockUtils.createNewServerData(conf, context);
WorkerInfo workerInfo = new WorkerInfo(-1);
NettyServer server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+ context);
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
(original)
+++
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
Fri Nov 30 19:50:06 2012
@@ -161,7 +161,8 @@ public class RequestFailureTest {
serverData = MockUtils.createNewServerData(conf, context);
WorkerInfo workerInfo = new WorkerInfo(-1);
server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+ context);
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
client = new NettyClient(context, conf, new WorkerInfo());
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
Fri Nov 30 19:50:06 2012
@@ -94,7 +94,8 @@ public class RequestTest {
serverData = MockUtils.createNewServerData(conf, context);
workerInfo = new WorkerInfo(-1);
server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData), workerInfo);
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo,
+ context);
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
client = new NettyClient(context, conf, new WorkerInfo());
Modified:
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1415806&r1=1415805&r2=1415806&view=diff
==============================================================================
---
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
(original)
+++
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
Fri Nov 30 19:50:06 2012
@@ -88,6 +88,7 @@ public class SaslConnectionTest {
new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData),
workerInfo,
+ context,
mockedSaslServerFactory);
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());