HIVE-16020: LLAP : Reduce IPC connection misses (Rajesh Balamohan, Siddharth 
Seth, reviewed by Sergey Shelukhin)

Change-Id: Ibf7f4c1a9840fa1463a7c7c4a777e5a276254e2e


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/59cde667
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/59cde667
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/59cde667

Branch: refs/heads/branch-2.2
Commit: 59cde667d77b549409898fdb30a549223a251e32
Parents: 5be5391
Author: Prasanth Jayachandran <pjayachand...@hortonworks.com>
Authored: Mon Feb 27 19:44:19 2017 -0800
Committer: Owen O'Malley <omal...@apache.org>
Committed: Tue Mar 28 15:27:52 2017 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/AMReporter.java       |  5 ++--
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 16 ++++++----
 .../hive/llap/daemon/impl/LlapDaemon.java       |  8 +++--
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 29 ++++++++++++++++++
 .../hive/llap/daemon/impl/QueryTracker.java     |  4 ++-
 .../llap/daemon/impl/TaskRunnerCallable.java    | 31 +++++++++++---------
 .../daemon/impl/TaskExecutorTestHelpers.java    |  4 ++-
 7 files changed, 72 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 32f070a..ede7e00 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -105,7 +105,8 @@ public class AMReporter extends AbstractService {
   private final DaemonId daemonId;
 
   public AMReporter(int numExecutors, int maxThreads, 
AtomicReference<InetSocketAddress>
-      localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, 
DaemonId daemonId) {
+      localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, 
DaemonId daemonId,
+      SocketFactory socketFactory) {
     super(AMReporter.class.getName());
     this.localAddress = localAddress;
     this.queryFailedHandler = queryFailedHandler;
@@ -137,7 +138,7 @@ public class AMReporter extends AbstractService {
         .retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep,
             TimeUnit.MILLISECONDS);
 
-    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.socketFactory = socketFactory;
 
     LOG.info("Setting up AMReporter with " +
         "heartbeatInterval(ms)=" + heartbeatInterval +

http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6908138..cc4eff0 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -83,6 +83,8 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import javax.net.SocketFactory;
+
 public class ContainerRunnerImpl extends CompositeService implements 
ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
 
   // TODO Setup a set of threads to process incoming requests.
@@ -107,12 +109,14 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
   private final String clusterId;
   private final DaemonId daemonId;
   private final UgiFactory fsUgiFactory;
+  private final SocketFactory socketFactory;
 
   public ContainerRunnerImpl(Configuration conf, int numExecutors, int 
waitQueueSize,
       boolean enablePreemption, String[] localDirsBase, 
AtomicReference<Integer> localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
       long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
-      AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, 
UgiFactory fsUgiFactory) {
+      AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, 
UgiFactory fsUgiFactory,
+      SocketFactory socketFactory) {
     super("ContainerRunnerImpl");
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
@@ -122,6 +126,7 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     this.signer = UserGroupInformation.isSecurityEnabled()
         ? new LlapSignerImpl(conf, daemonId.getClusterString()) : null;
     this.fsUgiFactory = fsUgiFactory;
+    this.socketFactory = socketFactory;
 
     this.clusterId = daemonId.getClusterString();
     this.daemonId = daemonId;
@@ -239,7 +244,8 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
           queryIdentifier, qIdProto.getApplicationIdString(), dagId,
           vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
           vertex.getVertexName(), request.getFragmentNumber(), 
request.getAttemptNumber(),
-          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo);
+          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, 
request.getAmHost(),
+          request.getAmPort());
 
       String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
@@ -250,12 +256,12 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
       // Used for re-localization, to add the user specified configuration 
(conf_pb_binary_stream)
 
       Configuration callableConf = new Configuration(getConfig());
-      UserGroupInformation taskUgi = fsUgiFactory == null ? null : 
fsUgiFactory.createUgi();
+      UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : 
fsUgiFactory.createUgi();
       TaskRunnerCallable callable = new TaskRunnerCallable(request, 
fragmentInfo, callableConf,
           new ExecutionContextImpl(localAddress.get().getHostName()), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, 
killedTaskHandler,
-          this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi,
-          completionListener);
+          this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi,
+          completionListener, socketFactory);
       submissionState = executorService.schedule(callable);
 
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 0494f0d..1ede5a1 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.ObjectName;
+import javax.net.SocketFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.LogUtils;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
 import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
@@ -106,6 +108,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
   private final long maxJvmMemory;
   private final String[] localDirs;
   private final DaemonId daemonId;
+  private final SocketFactory socketFactory;
 
   // TODO Not the best way to share the address
   private final AtomicReference<InetSocketAddress> srvAddress = new 
AtomicReference<>(),
@@ -257,8 +260,9 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
         " sessionId: " + sessionId);
 
     int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, 
ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS);
+    this.socketFactory = NetUtils.getDefaultSocketFactory(daemonConf);
     this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, 
srvAddress,
-        new QueryFailedHandlerProxy(), daemonConf, daemonId);
+        new QueryFailedHandlerProxy(), daemonConf, daemonId, socketFactory);
 
     SecretManager sm = null;
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -276,7 +280,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
     }
     this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, 
waitQueueSize,
         enablePreemption, localDirs, this.shufflePort, srvAddress, 
executorMemoryPerInstance, metrics,
-        amReporter, executorClassLoader, daemonId, fsUgiFactory);
+        amReporter, executorClassLoader, daemonId, fsUgiFactory, 
socketFactory);
     addIfService(containerRunner);
 
     // Not adding the registry as a service, since we need to control when it 
is initialized - conf used to pickup properties.

http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 5d07e90..5f0271f 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -25,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Preconditions;
@@ -37,6 +39,11 @@ import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
 
 public class QueryInfo {
   private final QueryIdentifier queryIdentifier;
@@ -58,6 +65,7 @@ public class QueryInfo {
 
   private final FinishableStateTracker finishableStateTracker = new 
FinishableStateTracker();
   private final String tokenUserName, appId;
+  private final AtomicReference<UserGroupInformation> umbilicalUgi;
 
   public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String 
dagIdString,
                    String dagName, String hiveQueryIdString,
@@ -77,6 +85,7 @@ public class QueryInfo {
     this.localFs = localFs;
     this.tokenUserName = tokenUserName;
     this.appId = tokenAppId;
+    this.umbilicalUgi = new AtomicReference<>();
   }
 
   public QueryIdentifier getQueryIdentifier() {
@@ -298,4 +307,24 @@ public class QueryInfo {
   public String getTokenAppId() {
     return appId;
   }
+
+  public void setupUmbilicalUgi(String umbilicalUser, 
Token<JobTokenIdentifier> appToken, String amHost, int amPort) {
+    synchronized (umbilicalUgi) {
+      if (umbilicalUgi.get() == null) {
+        UserGroupInformation taskOwner =
+            UserGroupInformation.createRemoteUser(umbilicalUser);
+        final InetSocketAddress address =
+            NetUtils.createSocketAddrForHost(amHost, amPort);
+        SecurityUtil.setTokenService(appToken, address);
+        taskOwner.addToken(appToken);
+        umbilicalUgi.set(taskOwner);
+      }
+    }
+  }
+
+  public UserGroupInformation getUmbilicalUgi() {
+    synchronized (umbilicalUgi) {
+      return umbilicalUgi.get();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 9eaddd2..5cf3a38 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -139,7 +139,7 @@ public class QueryTracker extends AbstractService {
   QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String 
appIdString, String dagIdString,
       String dagName, String hiveQueryIdString, int dagIdentifier, String 
vertexName, int fragmentNumber, int attemptNumber,
       String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> 
appToken,
-      String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
+      String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int 
amPort) throws IOException {
 
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     // Note: This is a readLock to prevent a race with queryComplete. 
Operations
@@ -174,6 +174,8 @@ public class QueryTracker extends AbstractService {
         if (old != null) {
           queryInfo = old;
         } else {
+          // Ensure the UGI is setup once.
+          queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, 
amHost, amPort);
           isExistingQueryInfo = false;
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index fe124ff..8739d5b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.MDC;
@@ -65,6 +64,7 @@ import org.apache.tez.runtime.task.TezTaskRunner2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.SocketFactory;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -116,7 +116,8 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   private final SignableVertexSpec vertex;
   private final TezEvent initialEvent;
   private final SchedulerFragmentCompletingListener completionListener;
-  private UserGroupInformation taskUgi;
+  private UserGroupInformation fsTaskUgi;
+  private final SocketFactory socketFactory;
 
   @VisibleForTesting
   public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo 
fragmentInfo,
@@ -125,7 +126,8 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
                             LlapDaemonExecutorMetrics metrics, 
KilledTaskHandler killedTaskHandler,
                             FragmentCompletionHandler fragmentCompleteHandler, 
HadoopShim tezHadoopShim,
                             TezTaskAttemptID attemptId, SignableVertexSpec 
vertex, TezEvent initialEvent,
-                            UserGroupInformation taskUgi, 
SchedulerFragmentCompletingListener completionListener) {
+                            UserGroupInformation fsTaskUgi, 
SchedulerFragmentCompletingListener completionListener,
+                            SocketFactory socketFactory) {
     this.request = request;
     this.fragmentInfo = fragmentInfo;
     this.conf = conf;
@@ -153,8 +155,9 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     this.fragmentCompletionHanler = fragmentCompleteHandler;
     this.tezHadoopShim = tezHadoopShim;
     this.initialEvent = initialEvent;
-    this.taskUgi = taskUgi;
+    this.fsTaskUgi = fsTaskUgi;
     this.completionListener = completionListener;
+    this.socketFactory = socketFactory;
   }
 
   public long getStartTime() {
@@ -195,27 +198,27 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
 
       // TODO Consolidate this code with TezChild.
       runtimeWatch.start();
-      if (taskUgi == null) {
-        taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
+      if (fsTaskUgi == null) {
+        fsTaskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
       }
-      taskUgi.addCredentials(credentials);
+      fsTaskUgi.addCredentials(credentials);
 
       Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
       serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
           TezCommonUtils.convertJobTokenToBytes(jobToken));
       Multimap<String, String> startedInputsMap = 
createStartedInputMap(vertex);
 
-      UserGroupInformation taskOwner =
-          UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
+      final UserGroupInformation taskOwner = 
fragmentInfo.getQueryInfo().getUmbilicalUgi();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("taskOwner hashCode:" + taskOwner.hashCode());
+      }
       final InetSocketAddress address =
           NetUtils.createSocketAddrForHost(request.getAmHost(), 
request.getAmPort());
-      SecurityUtil.setTokenService(jobToken, address);
-      taskOwner.addToken(jobToken);
       umbilical = taskOwner.doAs(new 
PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
         @Override
         public LlapTaskUmbilicalProtocol run() throws Exception {
           return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
-              LlapTaskUmbilicalProtocol.versionID, address, conf);
+              LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, 
socketFactory);
         }
       });
 
@@ -237,7 +240,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
       try {
         synchronized (this) {
           if (shouldRunTask) {
-            taskRunner = new TezTaskRunner2(conf, taskUgi, 
fragmentInfo.getLocalDirs(),
+            taskRunner = new TezTaskRunner2(conf, fsTaskUgi, 
fragmentInfo.getLocalDirs(),
                 taskSpec,
                 vertex.getQueryIdentifier().getAppAttemptNumber(),
                 serviceConsumerMetadata, envMap, startedInputsMap, 
taskReporter, executor,
@@ -259,7 +262,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
           isCompleted.set(true);
           return result;
         } finally {
-          FileSystem.closeAllForUGI(taskUgi);
+          FileSystem.closeAllForUGI(fsTaskUgi);
           LOG.info("ExecutionTime for Container: " + 
request.getContainerIdString() + "=" +
                   runtimeWatch.stop().elapsedMillis());
           if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 5dc1be5..ae3328a 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -44,6 +44,8 @@ import org.apache.tez.runtime.task.TaskRunner2Result;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.SocketFactory;
+
 public class TaskExecutorTestHelpers {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestTaskExecutorService.class);
@@ -184,7 +186,7 @@ public class TaskExecutorTestHelpers {
           mock(KilledTaskHandler.class), mock(
               FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
               requestProto.getWorkSpec().getVertex(), initialEvent, null, mock(
-              SchedulerFragmentCompletingListener.class));
+              SchedulerFragmentCompletingListener.class), 
mock(SocketFactory.class));
       this.workTime = workTime;
       this.canFinish = canFinish;
     }

Reply via email to