abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1961

Change subject: [ASTERIXDB-2058][HYR] Only Complete job cancellation after 
cleanup
......................................................................

[ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup

Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
R 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
M 
hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
M 
hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
M 
hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
M 
hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
M 
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
M 
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
M 
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
M 
hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
53 files changed, 552 insertions(+), 308 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/61/1961/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index cfd4e87..b09d139 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -111,18 +111,20 @@
     private final boolean executeQuery;
     private final boolean generateJobSpec;
     private final boolean optimize;
+    private final long timeout;
 
     // Flags.
     private final Map<String, Boolean> flags;
 
     public SessionConfig(OutputFormat fmt) {
-        this(fmt, true, true, true);
+        this(fmt, true, true, true, Long.MAX_VALUE);
     }
 
     /**
      * Create a SessionConfig object with all optional values set to defaults:
      * - All format flags set to "false".
      * - All out-of-band outputs set to "false".
+     *
      * @param fmt
      *            Output format for execution output.
      * @param optimize
@@ -132,12 +134,14 @@
      * @param generateJobSpec
      *            Whether to generate the Hyracks job specification (if
      */
-    public SessionConfig(OutputFormat fmt, boolean optimize, boolean 
executeQuery, boolean generateJobSpec) {
+    public SessionConfig(OutputFormat fmt, boolean optimize, boolean 
executeQuery, boolean generateJobSpec,
+            long timeout) {
         this.fmt = fmt;
         this.optimize = optimize;
         this.executeQuery = executeQuery;
         this.generateJobSpec = generateJobSpec;
         this.flags = new HashMap<>();
+        this.timeout = timeout;
     }
 
     /**
@@ -203,4 +207,8 @@
         Boolean value = flags.get(flag);
         return value == null ? false : value.booleanValue();
     }
+
+    public long getTimeout() {
+        return timeout;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 583302b..f31e993 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -356,13 +356,13 @@
         return spec;
     }
 
-    public void executeJobArray(IHyracksClientConnection hcc, 
JobSpecification[] specs, PrintWriter out)
+    public void executeJobArray(IHyracksClientConnection hcc, 
JobSpecification[] specs, PrintWriter out, long timeout)
             throws Exception {
         for (JobSpecification spec : specs) {
             spec.setMaxReattempts(0);
             JobId jobId = hcc.startJob(spec);
             long startTime = System.currentTimeMillis();
-            hcc.waitForCompletion(jobId);
+            hcc.waitForCompletion(jobId, timeout);
             long endTime = System.currentTimeMillis();
             double duration = (endTime - startTime) / 1000.00;
             out.println("<pre>Duration: " + duration + " sec</pre>");
@@ -370,7 +370,8 @@
 
     }
 
-    public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, 
PrintWriter out) throws Exception {
+    public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, 
PrintWriter out, long timeout)
+            throws Exception {
         for (Job job : jobs) {
             job.getJobSpec().setMaxReattempts(0);
             long startTime = System.currentTimeMillis();
@@ -379,7 +380,7 @@
                 if (job.getSubmissionMode() == SubmissionMode.ASYNCHRONOUS) {
                     continue;
                 }
-                hcc.waitForCompletion(jobId);
+                hcc.waitForCompletion(jobId, timeout);
             } catch (Exception e) {
                 e.printStackTrace();
                 continue;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7874aa3..e7b7435 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -138,7 +138,7 @@
             }
             IParser parser = parserFactory.createParser(query);
             List<Statement> aqlStatements = parser.parse();
-            SessionConfig sessionConfig = new SessionConfig(format, true, 
isSet(executeQuery), true);
+            SessionConfig sessionConfig = new SessionConfig(format, true, 
isSet(executeQuery), true, Long.MAX_VALUE);
             sessionConfig.set(SessionConfig.FORMAT_HTML, true);
             sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
             sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, 
isSet(wrapperArray));
@@ -150,8 +150,8 @@
                     compilationProvider, componentProvider);
             double duration;
             long startTime = System.currentTimeMillis();
-            translator.compileAndExecute(hcc, hds, 
IStatementExecutor.ResultDelivery.IMMEDIATE,
-                    null, new IStatementExecutor.Stats());
+            translator.compileAndExecute(hcc, hds, 
IStatementExecutor.ResultDelivery.IMMEDIATE, null,
+                    new IStatementExecutor.Stats());
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(HTML_STATEMENT_SEPARATOR);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1f1d282..e39b63f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -121,7 +121,7 @@
         }
     }
 
-    private enum Attribute {
+    protected enum Attribute {
         HEADER("header"),
         LOSSLESS("lossless");
 
@@ -182,14 +182,14 @@
         }
     }
 
-    static class RequestParameters {
-        String host;
-        String path;
-        String statement;
-        String format;
-        boolean pretty;
-        String clientContextID;
-        String mode;
+    protected static class RequestParameters {
+        public String host;
+        public String path;
+        public String statement;
+        public String format;
+        public boolean pretty;
+        public String clientContextID;
+        public String mode;
 
         @Override
         public String toString() {
@@ -209,7 +209,7 @@
         }
     }
 
-    private static String getParameterValue(String content, String attribute) {
+    protected static String getParameterValue(String content, String 
attribute) {
         if (content == null || attribute == null) {
             return null;
         }
@@ -231,7 +231,7 @@
         return s != null ? s.toLowerCase() : s;
     }
 
-    private static SessionConfig.OutputFormat getFormat(String format) {
+    protected static SessionConfig.OutputFormat getFormat(String format) {
         if (format != null) {
             if (format.startsWith(HttpUtil.ContentType.CSV)) {
                 return SessionConfig.OutputFormat.CSV;
@@ -247,8 +247,7 @@
         return SessionConfig.OutputFormat.CLEAN_JSON;
     }
 
-    private static SessionOutput createSessionOutput(RequestParameters param, 
String handleUrl,
-            PrintWriter resultWriter) {
+    protected SessionOutput createSessionOutput(RequestParameters param, 
String handleUrl, PrintWriter resultWriter) {
         SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
         SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
         SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
similarity index 62%
rename from 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
rename to 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
index 9547514..4f4221e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java
@@ -17,14 +17,18 @@
  * under the License.
  */
 
-package org.apache.asterix.api.http.server;
+package org.apache.asterix.api.http.servlet;
 
+import java.io.PrintWriter;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.QueryServiceServlet;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.message.CancelQueryRequest;
 import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
 import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
 import org.apache.asterix.app.result.ResultReader;
@@ -34,6 +38,7 @@
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -41,11 +46,14 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 /**
  * Query service servlet that can run on NC nodes.
  * Delegates query execution to CC, then serves the result.
  */
 public class NCQueryServiceServlet extends QueryServiceServlet {
+
     public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx,
             ILangExtension.Language queryLanguage) {
         super(ctx, paths, appCtx, queryLanguage, null, null, null);
@@ -63,13 +71,23 @@
         ExecuteStatementResponseMessage responseMsg;
         MessageFuture responseFuture = ncMb.registerMessageFuture();
         try {
+            if (param.clientContextID == null) {
+                param.clientContextID = UUID.randomUUID().toString();
+            }
             ExecuteStatementRequestMessage requestMsg =
                     new ExecuteStatementRequestMessage(ncCtx.getNodeId(), 
responseFuture.getFutureId(), queryLanguage,
                             statementsText, sessionOutput.config(), 
ccDelivery, param.clientContextID, handleUrl);
             outExecStartEnd[0] = System.nanoTime();
             ncMb.sendMessageToCC(requestMsg);
-            responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
-                    ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, 
java.util.concurrent.TimeUnit.MILLISECONDS);
+            try {
+                responseMsg = (ExecuteStatementResponseMessage) 
responseFuture.get(
+                        
ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS,
+                        java.util.concurrent.TimeUnit.MILLISECONDS);
+            } catch (TimeoutException exception) {
+                // cancel query
+                cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, 
exception);
+                throw exception;
+            }
             outExecStartEnd[1] = System.nanoTime();
         } finally {
             ncMb.deregisterMessageFuture(responseFuture.getFutureId());
@@ -97,6 +115,39 @@
         }
     }
 
+    private void cancelQuery(INCMessageBroker messageBroker, String nodeId, 
String clientContextID,
+            TimeoutException exception) {
+        MessageFuture cancelQueryFuture = 
messageBroker.registerMessageFuture();
+        CancelQueryRequest cancelQueryMessage =
+                new CancelQueryRequest(nodeId, 
cancelQueryFuture.getFutureId(), clientContextID);
+        try {
+            messageBroker.sendMessageToCC(cancelQueryMessage);
+            
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
+                    java.util.concurrent.TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            exception.addSuppressed(e);
+        }
+    }
+
+    @Override
+    protected SessionOutput createSessionOutput(RequestParameters param, 
String handleUrl, PrintWriter resultWriter) {
+        SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
+        SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
+        SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
+        SessionOutput.ResultAppender appendStatus = 
ResultUtil.createResultStatusAppender();
+
+        SessionConfig.OutputFormat format = 
QueryServiceServlet.getFormat(param.format);
+        SessionConfig sessionConfig =
+                new SessionConfig(format, true, true, true, 
ExecuteStatementRequestMessage.DEFAULT_JOB_TIMEOUT_MILLIS);
+        sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
+        sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
+        sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
+                format != SessionConfig.OutputFormat.CLEAN_JSON && format != 
SessionConfig.OutputFormat.LOSSLESS_JSON);
+        sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == 
SessionConfig.OutputFormat.CSV
+                && 
"present".equals(QueryServiceServlet.getParameterValue(param.format, 
Attribute.HEADER.str())));
+        return new SessionOutput(sessionConfig, resultWriter, resultPrefix, 
resultPostfix, appendHandle, appendStatus);
+    }
+
     @Override
     protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
         if (t instanceof IPCException || t instanceof TimeoutException) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index a9d24b9..d6bbb6e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -100,7 +100,7 @@
         List<Statement> statements = parser.parse();
         MetadataManager.INSTANCE.init();
 
-        SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, 
true, generateBinaryRuntime);
+        SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, 
true, generateBinaryRuntime, Long.MAX_VALUE);
         conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan, 
printOptimizedPlan, printJob);
         if (printPhysicalOpsOnly) {
             conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
@@ -109,17 +109,17 @@
 
         IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
-        translator.compileAndExecute(hcc, null, 
QueryTranslator.ResultDelivery.IMMEDIATE,
-                null, new IStatementExecutor.Stats());
+        translator.compileAndExecute(hcc, null, 
QueryTranslator.ResultDelivery.IMMEDIATE, null,
+                new IStatementExecutor.Stats());
         writer.flush();
     }
 
     public void execute() throws Exception {
         if (dmlJobs != null) {
-            apiFramework.executeJobArray(hcc, dmlJobs, writer);
+            apiFramework.executeJobArray(hcc, dmlJobs, writer, Long.MAX_VALUE);
         }
         if (queryJobSpec != null) {
-            apiFramework.executeJobArray(hcc, new JobSpecification[] { 
queryJobSpec }, writer);
+            apiFramework.executeJobArray(hcc, new JobSpecification[] { 
queryJobSpec }, writer, Long.MAX_VALUE);
         }
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 124e56e..e01b1c5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -97,7 +97,7 @@
             // We will need to design general exception handling mechanism for 
feeds.
             setLocations(jobInfo.getRight());
             boolean wait = 
Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
-            JobUtils.runJob(hcc, feedJob, false);
+            JobUtils.runJob(hcc, feedJob);
             eventSubscriber.sync();
             if (wait) {
                 IActiveEntityEventSubscriber stoppedSubscriber = new 
WaitForStateSubscriber(this,
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
new file mode 100644
index 0000000..fb6ec37
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class CancelQueryRequest implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = 
Logger.getLogger(CancelQueryRequest.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+    private final String contextId;
+
+    public CancelQueryRequest(String nodeId, long reqId, String contextId) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+        this.contextId = contextId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        ClusterControllerService ccs = (ClusterControllerService) 
appCtx.getServiceContext().getControllerService();
+        CCApplication application = (CCApplication) ccs.getApplication();
+        IStatementExecutorContext executorsCtx = 
application.getStatementExecutorContext();
+        JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+
+        if (jobId == null) {
+            LOGGER.log(Level.WARN, "No job found for context id " + contextId);
+        } else {
+            try {
+                IHyracksClientConnection hcc = application.getHcc();
+                hcc.cancelJob(jobId);
+                executorsCtx.removeJobIdFromClientContextId(contextId);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARN, "unexpected exception thrown from 
cancel", e);
+            }
+        }
+        CancelQueryResponse response = new CancelQueryResponse(reqId);
+        CCMessageBroker messageBroker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
new file mode 100644
index 0000000..4fbcf22
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CancelQueryResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+
+    public CancelQueryResponse(long reqId) {
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index e7919fa..e53cc83 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -22,6 +22,7 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -54,23 +55,17 @@
 
 public final class ExecuteStatementRequestMessage implements 
ICcAddressedMessage {
     private static final long serialVersionUID = 1L;
-
     private static final Logger LOGGER = 
Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
-
+    public static final long DEFAULT_NC_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(6);
+    public static final long DEFAULT_JOB_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+    public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(1);
     private final String requestNodeId;
-
     private final long requestMessageId;
-
     private final ILangExtension.Language lang;
-
     private final String statementsText;
-
     private final SessionConfig sessionConfig;
-
     private final IStatementExecutor.ResultDelivery delivery;
-
     private final String clientContextID;
-
     private final String handleUrl;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long 
requestMessageId, ILangExtension.Language lang,
@@ -102,47 +97,41 @@
         IStorageComponentProvider storageComponentProvider = 
ccAppCtx.getStorageComponentProvider();
         IStatementExecutorFactory statementExecutorFactory = 
ccApp.getStatementExecutorFactory();
         IStatementExecutorContext statementExecutorContext = 
ccApp.getStatementExecutorContext();
-
-        ccSrv.getExecutor().submit(() -> {
-            ExecuteStatementResponseMessage responseMsg = new 
ExecuteStatementResponseMessage(requestMessageId);
-            try {
-                IParser parser = 
compilationProvider.getParserFactory().createParser(statementsText);
-                List<Statement> statements = parser.parse();
-                StringWriter outWriter = new StringWriter(256);
-                PrintWriter outPrinter = new PrintWriter(outWriter);
-                SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
-                SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
-                SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
-                SessionOutput.ResultAppender appendStatus = 
ResultUtil.createResultStatusAppender();
-                SessionOutput sessionOutput = new SessionOutput(sessionConfig, 
outPrinter, resultPrefix, resultPostfix,
-                        appendHandle, appendStatus);
-
-                IStatementExecutor.ResultMetadata outMetadata = new 
IStatementExecutor.ResultMetadata();
-
-                MetadataManager.INSTANCE.init();
-                IStatementExecutor translator = 
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
-                        compilationProvider, storageComponentProvider);
-                translator.compileAndExecute(ccAppCtx.getHcc(), null, 
delivery, outMetadata,
-                        new IStatementExecutor.Stats(), clientContextID, 
statementExecutorContext);
-
-                outPrinter.close();
-                responseMsg.setResult(outWriter.toString());
-                responseMsg.setMetadata(outMetadata);
-            } catch (AlgebricksException | HyracksException | TokenMgrError
-                    | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
-                // we trust that "our" exceptions are serializable and have a 
comprehensible error message
-                GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, 
pe.getMessage(), pe);
-                responseMsg.setError(pe);
-            } catch (Exception e) {
-                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected 
exception", e);
-                responseMsg.setError(new Exception(e.toString()));
-            }
-            try {
-                messageBroker.sendApplicationMessageToNC(responseMsg, 
requestNodeId);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, e.toString(), e);
-            }
-        });
+        ExecuteStatementResponseMessage responseMsg = new 
ExecuteStatementResponseMessage(requestMessageId);
+        try {
+            IParser parser = 
compilationProvider.getParserFactory().createParser(statementsText);
+            List<Statement> statements = parser.parse();
+            StringWriter outWriter = new StringWriter(256);
+            PrintWriter outPrinter = new PrintWriter(outWriter);
+            SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
+            SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
+            SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
+            SessionOutput.ResultAppender appendStatus = 
ResultUtil.createResultStatusAppender();
+            SessionOutput sessionOutput = new SessionOutput(sessionConfig, 
outPrinter, resultPrefix, resultPostfix,
+                    appendHandle, appendStatus);
+            IStatementExecutor.ResultMetadata outMetadata = new 
IStatementExecutor.ResultMetadata();
+            MetadataManager.INSTANCE.init();
+            IStatementExecutor translator = 
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+                    compilationProvider, storageComponentProvider);
+            translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, 
outMetadata, new IStatementExecutor.Stats(),
+                    clientContextID, statementExecutorContext);
+            outPrinter.close();
+            responseMsg.setResult(outWriter.toString());
+            responseMsg.setMetadata(outMetadata);
+        } catch (AlgebricksException | HyracksException | TokenMgrError
+                | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+            // we trust that "our" exceptions are serializable and have a 
comprehensible error message
+            GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), 
pe);
+            responseMsg.setError(pe);
+        } catch (Exception e) {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected 
exception", e);
+            responseMsg.setError(new Exception(e.toString()));
+        }
+        try {
+            messageBroker.sendApplicationMessageToNC(responseMsg, 
requestNodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e.toString(), e);
+        }
     }
 
     private String getRejectionReason(ClusterControllerService ccSrv) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
index 4f9aa0c..54f0a4e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -19,8 +19,6 @@
 
 package org.apache.asterix.app.message;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -30,8 +28,6 @@
 
 public final class ExecuteStatementResponseMessage implements 
INcAddressedMessage {
     private static final long serialVersionUID = 1L;
-
-    public static final long DEFAULT_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
 
     private final long requestMessageId;
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b97c014..81210d5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -267,8 +267,8 @@
          */
         String threadName = Thread.currentThread().getName();
         Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
-        Map<String, String> config = new HashMap<>();
         try {
+            Map<String, String> config = new HashMap<>();
             for (Statement stmt : statements) {
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
                     
sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
@@ -925,7 +925,7 @@
                                 "Failed to create job spec for replicating 
Files Index For external dataset");
                     }
                     filesIndexReplicated = true;
-                    runJob(hcc, spec, jobFlags);
+                    runJob(hcc, spec, jobFlags, Long.MAX_VALUE);
                 }
             }
 
@@ -956,7 +956,7 @@
             bActiveTxn = false;
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
             // #. create the index artifact in NC.
-            runJob(hcc, spec, jobFlags);
+            runJob(hcc, spec, jobFlags, Long.MAX_VALUE);
 
             // #. flush the internal dataset for correlated policy
             if (ds.isCorrelated() && ds.getDatasetType() == 
DatasetType.INTERNAL) {
@@ -972,7 +972,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            runJob(hcc, spec, jobFlags);
+            runJob(hcc, spec, jobFlags, Long.MAX_VALUE);
 
             // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1009,7 +1009,7 @@
                             
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec, jobFlags);
+                    runJob(hcc, jobSpec, jobFlags, Long.MAX_VALUE);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1028,7 +1028,7 @@
                     JobSpecification jobSpec = 
IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec, jobFlags);
+                    runJob(hcc, jobSpec, jobFlags, Long.MAX_VALUE);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1346,7 +1346,8 @@
                 // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        JobId jobId = JobUtils.runJob(hcc, jobSpec);
+                        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to 
be compensated.
@@ -2340,7 +2341,7 @@
                     final ResultReader resultReader = new ResultReader(hdc, 
id, resultSetId);
                     ResultUtil.printResults(appCtx, resultReader, 
sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
-                }, clientContextId, ctx);
+                }, clientContextId, ctx, sessionConfig.getTimeout());
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, 
resultDelivery, id -> {
@@ -2349,7 +2350,7 @@
                         outMetadata.getResultSets()
                                 .add(Triple.of(id, resultSetId, 
metadataProvider.findOutputRecordType()));
                     }
-                }, clientContextId, ctx);
+                }, clientContextId, ctx, sessionConfig.getTimeout());
                 break;
             default:
                 break;
@@ -2369,7 +2370,7 @@
                     printed.setTrue();
                     printed.notify();
                 }
-            }, clientContextId, ctx);
+            }, clientContextId, ctx, sessionConfig.getTimeout());
         } catch (Exception e) {
             if (JobId.INVALID.equals(jobId.getValue())) {
                 // compilation failed
@@ -2390,24 +2391,25 @@
     }
 
     private void runJob(IHyracksClientConnection hcc, JobSpecification 
jobSpec) throws Exception {
-        runJob(hcc, jobSpec, jobFlags);
+        runJob(hcc, jobSpec, jobFlags, sessionOutput.config().getTimeout());
     }
 
-    private static void runJob(IHyracksClientConnection hcc, JobSpecification 
jobSpec, EnumSet<JobFlag> jobFlags)
-            throws Exception {
-        JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+    private static void runJob(IHyracksClientConnection hcc, JobSpecification 
jobSpec, EnumSet<JobFlag> jobFlags,
+            long timeout) throws Exception {
+        JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags);
+        hcc.waitForCompletion(jobId, timeout);
     }
 
     private static void createAndRunJob(IHyracksClientConnection hcc, 
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, 
ResultDelivery resultDelivery, IResultPrinter printer,
-            String clientContextId, IStatementExecutorContext ctx) throws 
Exception {
+            String clientContextId, IStatementExecutorContext ctx, long 
timeout) throws Exception {
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
                 return;
             }
-            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags);
             if (ctx != null && clientContextId != null) {
                 ctx.put(clientContextId, jobId); // Adds the running job into 
the context.
             }
@@ -2416,9 +2418,9 @@
             }
             if (ResultDelivery.ASYNC == resultDelivery) {
                 printer.print(jobId);
-                hcc.waitForCompletion(jobId);
+                hcc.waitForCompletion(jobId, timeout);
             } else {
-                hcc.waitForCompletion(jobId);
+                hcc.waitForCompletion(jobId, timeout);
                 printer.print(jobId);
             }
         } finally {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e8636c8..b1a3235 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -106,6 +106,7 @@
     protected WebManager webManager;
     protected CcApplicationContext appCtx;
     private IJobCapacityController jobCapacityController;
+    private IHyracksClientConnection hcc;
 
     @Override
     public void start(IServiceContext serviceCtx, String[] args) throws 
Exception {
@@ -121,9 +122,11 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix cluster controller");
         }
-
         ccServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
+        String strIP = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
+        int port = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+        hcc = new HyracksConnection(strIP, port);
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ResourceIdManager resourceIdManager = new ResourceIdManager();
         IReplicationStrategy repStrategy = 
ClusterProperties.INSTANCE.getReplicationStrategy();
@@ -357,9 +360,7 @@
         return appCtx;
     }
 
-    protected IHyracksClientConnection getHcc() throws Exception {
-        String strIP = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
-        int port = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
-        return new HyracksConnection(strIP, port);
+    public IHyracksClientConnection getHcc() throws Exception {
+        return hcc;
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3209557..40b6c90 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -77,7 +77,7 @@
     private void executeHyracksJob(JobSpecification spec) throws Exception {
         spec.setMaxReattempts(0);
         JobId jobId = hcc.startJob(spec);
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index 958444c..d9ddb4f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -60,13 +61,13 @@
                 new IPushRuntimeFactory[] { new 
EmptyTupleSourceRuntimeFactory() }, rDescs);
 
         org.apache.asterix.common.transactions.JobId jobId = 
JobIdFactory.generateJobId();
-        FlushDatasetOperatorDescriptor flushOperator = new 
FlushDatasetOperatorDescriptor(spec, jobId,
-                dataset.getDatasetId());
+        FlushDatasetOperatorDescriptor flushOperator =
+                new FlushDatasetOperatorDescriptor(spec, jobId, 
dataset.getDatasetId());
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, 
flushOperator, 0);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint = metadataProvider
-                .getSplitProviderAndConstraints(dataset, 
dataset.getDatasetName());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
+                metadataProvider.getSplitProviderAndConstraints(dataset, 
dataset.getDatasetName());
         AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
 
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
emptySource,
@@ -74,7 +75,8 @@
 
         JobEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, true);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        JobUtils.runJob(hcc, spec, true);
+        JobId hyracksJobId = JobUtils.runJob(hcc, spec);
+        hcc.waitForCompletion(hyracksJobId, Long.MAX_VALUE);
     }
 
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ceaf4cf..62ca32c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -114,9 +114,9 @@
 
             if (!targetNcNames.isEmpty()) {
                 // Creates a node group for rebalance.
-                String nodeGroupName = DatasetUtil
-                        
.createNodeGroupForNewDataset(sourceDataset.getDataverseName(), 
sourceDataset.getDatasetName(),
-                                sourceDataset.getRebalanceCount() + 1, 
targetNcNames, metadataProvider);
+                String nodeGroupName = 
DatasetUtil.createNodeGroupForNewDataset(sourceDataset.getDataverseName(),
+                        sourceDataset.getDatasetName(), 
sourceDataset.getRebalanceCount() + 1, targetNcNames,
+                        metadataProvider);
                 // The target dataset for rebalance.
                 targetDataset = 
sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
 
@@ -266,7 +266,8 @@
     private static void createRebalanceTarget(Dataset target, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         JobSpecification spec = DatasetUtil.createDatasetJobSpec(target, 
metadataProvider);
-        JobUtils.runJob(hcc, spec, true);
+        org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, spec);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
     }
 
     // Populates the data from the source dataset to the rebalance target 
dataset.
@@ -303,7 +304,8 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, 
commitOp, 0);
 
         // Executes the job.
-        JobUtils.runJob(hcc, spec, true);
+        org.apache.hyracks.api.job.JobId hyracksJobId = JobUtils.runJob(hcc, 
spec);
+        hcc.waitForCompletion(hyracksJobId, Long.MAX_VALUE);
     }
 
     // Creates the primary index upsert operator for populating the target 
dataset.
@@ -341,7 +343,8 @@
             jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, 
dataset, true));
         }
         for (JobSpecification jobSpec : jobs) {
-            JobUtils.runJob(hcc, jobSpec, true);
+            org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, 
jobSpec);
+            hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         }
     }
 
@@ -355,12 +358,14 @@
             // Creates the secondary index.
             JobSpecification indexCreationJobSpec =
                     IndexUtil.buildSecondaryIndexCreationJobSpec(target, 
index, metadataProvider);
-            JobUtils.runJob(hcc, indexCreationJobSpec, true);
+            org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, 
indexCreationJobSpec);
+            hcc.waitForCompletion(jobId, Long.MAX_VALUE);
 
             // Loads the secondary index.
             JobSpecification indexLoadingJobSpec =
                     IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, 
metadataProvider);
-            JobUtils.runJob(hcc, indexLoadingJobSpec, true);
+            jobId = JobUtils.runJob(hcc, indexLoadingJobSpec);
+            hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         }
     }
 
@@ -393,9 +398,9 @@
             dropDatasetFiles(dataset, metadataProvider, hcc);
 
             // drop dataset entry from metadata
-            runMetadataTransaction(metadataProvider, () -> 
MetadataManager.INSTANCE
-                    .dropDataset(metadataProvider.getMetadataTxnContext(), 
dataset.getDataverseName(),
-                            dataset.getDatasetName()));
+            runMetadataTransaction(metadataProvider,
+                    () -> 
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(),
+                            dataset.getDataverseName(), 
dataset.getDatasetName()));
             // try to drop the dataset's node group
             runMetadataTransaction(metadataProvider, () -> 
tryDropDatasetNodegroup(dataset, metadataProvider));
         });
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index cacbfbc..c60ebac 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -33,18 +33,13 @@
         ADDED_PENDINGOP_RECORD_TO_METADATA
     }
 
-    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec, boolean waitForCompletion)
-            throws Exception {
-        return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), 
waitForCompletion);
+    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec) throws Exception {
+        return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class));
     }
 
-    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec, EnumSet<JobFlag> jobFlags,
-            boolean waitForCompletion) throws Exception {
+    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec, EnumSet<JobFlag> jobFlags)
+            throws Exception {
         spec.setMaxReattempts(0);
-        final JobId jobId = hcc.startJob(spec, jobFlags);
-        if (waitForCompletion) {
-            hcc.waitForCompletion(jobId);
-        }
-        return jobId;
+        return hcc.startJob(spec, jobFlags);
     }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index fa60bba..910247b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -350,12 +350,14 @@
 
             // # disconnect the feeds
             for (Pair<JobSpecification, Boolean> p : 
disconnectJobList.values()) {
-                JobUtils.runJob(hcc, p.first, true);
+                org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, 
p.first);
+                hcc.waitForCompletion(jobId, Long.MAX_VALUE);
             }
 
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, 
jobSpec);
+                hcc.waitForCompletion(jobId, Long.MAX_VALUE);
             }
 
             mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
@@ -389,7 +391,8 @@
 
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, 
jobSpec);
+                hcc.waitForCompletion(jobId, Long.MAX_VALUE);
             }
             if (!indexes.isEmpty()) {
                 ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(this);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index 2971b72..d2af495 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -65,8 +65,7 @@
         ncConfig1.setClusterListenAddress("127.0.0.1");
         ncConfig1.setDataListenAddress("127.0.0.1");
         ncConfig1.setResultListenAddress("127.0.0.1");
-        ncConfig1.setIODevices(new String [] { 
joinPath(System.getProperty("user.dir"), "target", "data",
-                "device0") });
+        ncConfig1.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
         FileUtils.forceMkdir(new File(ncConfig1.getIODevices()[0]));
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -77,8 +76,7 @@
         ncConfig2.setClusterListenAddress("127.0.0.1");
         ncConfig2.setDataListenAddress("127.0.0.1");
         ncConfig2.setResultListenAddress("127.0.0.1");
-        ncConfig2.setIODevices(new String [] { 
joinPath(System.getProperty("user.dir"), "target", "data",
-                "device1") });
+        ncConfig2.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
         FileUtils.forceMkdir(new File(ncConfig1.getIODevices()[0]));
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
@@ -96,7 +94,7 @@
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
         JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e2868ae..15e5952 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -130,6 +130,9 @@
 
         public CancelJobFunction(JobId jobId) {
             this.jobId = jobId;
+            if (jobId == null) {
+                throw new NullPointerException("jobId");
+            }
         }
 
         @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..2718ac3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -37,6 +37,7 @@
 
 public class HyracksClientInterfaceRemoteProxy implements 
IHyracksClientInterface {
     private static final int SHUTDOWN_CONNECTION_TIMEOUT_SECS = 30;
+    private static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
 
     private final IIPCHandle ipcHandle;
 
@@ -51,112 +52,113 @@
     public ClusterControllerInfo getClusterControllerInfo() throws Exception {
         HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif 
=
                 new 
HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
-        return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
+        return (ClusterControllerInfo) rpci.call(ipcHandle, gccif, 
DEFAULT_TIMEOUT);
     }
 
     @Override
     public JobStatus getJobStatus(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
                 new 
HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId);
-        return (JobStatus) rpci.call(ipcHandle, gjsf);
+        return (JobStatus) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws 
Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
                 new 
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags);
-        return (JobId) rpci.call(ipcHandle, sjf);
+        return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public void cancelJob(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.CancelJobFunction cjf = new 
HyracksClientInterfaceFunctions.CancelJobFunction(
-                jobId);
-        rpci.call(ipcHandle, cjf);
+        HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+                new HyracksClientInterfaceFunctions.CancelJobFunction(jobId);
+        rpci.call(ipcHandle, cjf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public JobId startJob(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf =
                 new HyracksClientInterfaceFunctions.StartJobFunction(jobId);
-        return (JobId) rpci.call(ipcHandle, sjf);
+        return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) throws Exception {
-        HyracksClientInterfaceFunctions.StartJobFunction sjf = new 
HyracksClientInterfaceFunctions.StartJobFunction(
-                deploymentId, acggfBytes, jobFlags);
-        return (JobId) rpci.call(ipcHandle, sjf);
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, 
jobFlags);
+        return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public JobId distributeJob(byte[] acggfBytes) throws Exception {
         HyracksClientInterfaceFunctions.DistributeJobFunction sjf =
                 new 
HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes);
-        return (JobId) rpci.call(ipcHandle, sjf);
+        return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public JobId destroyJob(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.DestroyJobFunction sjf =
                 new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId);
-        return (JobId) rpci.call(ipcHandle, sjf);
+        return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
         HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction 
gddsf =
                 new 
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
-        return (NetworkAddress) rpci.call(ipcHandle, gddsf);
+        return (NetworkAddress) rpci.call(ipcHandle, gddsf, DEFAULT_TIMEOUT);
     }
 
     @Override
-    public void waitForCompletion(JobId jobId) throws Exception {
+    public void waitForCompletion(JobId jobId, long timeout) throws Exception {
         HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
                 new 
HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId);
-        rpci.call(ipcHandle, wfcf);
+        rpci.call(ipcHandle, wfcf, timeout);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws 
Exception {
         HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
                 new 
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
-        return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
+        return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif, 
DEFAULT_TIMEOUT);
     }
 
     @Override
     public ClusterTopology getClusterTopology() throws Exception {
         HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf =
                 new 
HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
-        return (ClusterTopology) rpci.call(ipcHandle, gctf);
+        return (ClusterTopology) rpci.call(ipcHandle, gctf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) 
throws Exception {
         HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
                 new 
HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, 
deploymentId);
-        rpci.call(ipcHandle, dbf);
+        rpci.call(ipcHandle, dbf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public void unDeployBinary(DeploymentId deploymentId) throws Exception {
         HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf =
                 new 
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId);
-        rpci.call(ipcHandle, dbf);
+        rpci.call(ipcHandle, dbf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public JobInfo getJobInfo(JobId jobId) throws Exception {
         HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
                 new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId);
-        return (JobInfo) rpci.call(ipcHandle, gjsf);
+        return (JobInfo) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public void stopCluster(boolean terminateNCService) throws Exception {
         HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
                 new 
HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
-        rpci.call(ipcHandle, csdf);
+        rpci.call(ipcHandle, csdf, DEFAULT_TIMEOUT);
         int i = 0;
         // give the CC some time to do final settling after it returns our 
request
         while (ipcHandle.isConnected() && i++ < 
SHUTDOWN_CONNECTION_TIMEOUT_SECS) {
@@ -165,8 +167,8 @@
             }
         }
         if (ipcHandle.isConnected()) {
-            throw new IPCException("CC refused to release connection after " + 
SHUTDOWN_CONNECTION_TIMEOUT_SECS
-                    + " seconds");
+            throw new IPCException(
+                    "CC refused to release connection after " + 
SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds");
         }
     }
 
@@ -174,13 +176,13 @@
     public String getNodeDetailsJSON(String nodeId, boolean includeStats, 
boolean includeConfig) throws Exception {
         HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gjsf =
                 new 
HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction(nodeId, 
includeStats, includeConfig);
-        return (String) rpci.call(ipcHandle, gjsf);
+        return (String) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT);
     }
 
     @Override
     public String getThreadDump(String node) throws Exception {
         HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
                 new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
-        return (String)rpci.call(ipcHandle, tdf);
+        return (String) rpci.call(ipcHandle, tdf, DEFAULT_TIMEOUT);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 75cbf61..7b5dfe8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPut;
@@ -124,6 +125,7 @@
         return hci.startJob(jobId);
     }
 
+    @Override
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
     }
@@ -132,15 +134,16 @@
         return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
     }
 
+    @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
         return hci.getDatasetDirectoryServiceInfo();
     }
 
     @Override
-    public void waitForCompletion(JobId jobId) throws Exception {
+    public void waitForCompletion(JobId jobId, long timeout) throws Exception {
         try {
-            hci.waitForCompletion(jobId);
-        } catch (InterruptedException e) {
+            hci.waitForCompletion(jobId, timeout);
+        } catch (InterruptedException | TimeoutException e) {
             // Cancels an on-going job if the current thread gets interrupted.
             hci.cancelJob(jobId);
             throw e;
@@ -152,7 +155,7 @@
         try {
             return hci.getNodeControllersInfo();
         } catch (Exception e) {
-            throw new HyracksException(e);
+            throw HyracksException.create(e);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..49371c4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -141,8 +141,7 @@
      *            JobId of the Job
      * @throws Exception
      */
-    public void waitForCompletion(JobId jobId) throws Exception;
-
+    public void waitForCompletion(JobId jobId, long timeout) throws Exception;
 
     /**
      * Deploy the user-defined jars to the cluster
@@ -201,16 +200,19 @@
 
     /**
      * Shuts down all NCs and then the CC.
+     * 
      * @param terminateNCService
      */
     public void stopCluster(boolean terminateNCService) throws Exception;
 
     /**
      * Get details of specified node as JSON object
+     * 
      * @param nodeId
-     *              id the subject node
+     *            id the subject node
      * @param includeStats
-     * @param includeConfig @return serialized JSON containing the node details
+     * @param includeConfig
+     *            @return serialized JSON containing the node details
      * @throws Exception
      */
     public String getNodeDetailsJSON(String nodeId, boolean includeStats, 
boolean includeConfig) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..7e3715a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -48,7 +48,7 @@
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
-    public void waitForCompletion(JobId jobId) throws Exception;
+    public void waitForCompletion(JobId jobId, long timeout) throws Exception;
 
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws 
Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 4310cd0..7abb22f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -40,16 +40,16 @@
 
     @Override
     public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws 
Exception {
-        HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = 
new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(
-                jobId, rsId);
-        return (Status) rpci.call(ipcHandle, gdrlf);
+        HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
+                new 
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(jobId, rsId);
+        return (Status) rpci.call(ipcHandle, gdrlf, Long.MAX_VALUE);
     }
 
     @Override
     public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, 
ResultSetId rsId,
             DatasetDirectoryRecord[] knownRecords) throws Exception {
-        HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction 
gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
-                jobId, rsId, knownRecords);
-        return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
+        HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction 
gdrlf =
+                new 
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(jobId, rsId, 
knownRecords);
+        return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf, 
Long.MAX_VALUE);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
index 54ae838..1b253d9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
@@ -94,7 +94,7 @@
     public static void runJob(JobSpecification spec, String appName) throws 
Exception {
         spec.setFrameSize(FRAME_SIZE);
         JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index dbbaf9f..bde9b9b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -65,6 +65,8 @@
 import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.work.DummyCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class JobExecutor {
     private static final Logger LOGGER = 
Logger.getLogger(JobExecutor.class.getName());
@@ -114,9 +116,10 @@
         ccs.getContext().notifyJobStart(jobRun.getJobId());
     }
 
-    public void cancelJob() throws HyracksException {
+    public void cancelJob(IResultCallback<Void> callback) throws 
HyracksException {
         // If the job is already terminated or failed, do nothing here.
         if (jobRun.getPendingStatus() != null) {
+            callback.setValue(null);
             return;
         }
         // Sets the cancelled flag.
@@ -124,7 +127,8 @@
         // Aborts on-ongoing task clusters.
         abortOngoingTaskClusters(ta -> false, ta -> null);
         // Aborts the whole job.
-        
abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
 jobRun.getJobId())));
+        
abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
 jobRun.getJobId())),
+                callback);
     }
 
     private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, 
Collection<ActivityCluster> roots)
@@ -196,8 +200,8 @@
                     "Runnable TC roots: " + taskClusterRoots + ", 
inProgressTaskClusters: " + inProgressTaskClusters);
         }
         if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
-            ccs.getWorkQueue()
-                    .schedule(new JobCleanupWork(ccs.getJobManager(), 
jobRun.getJobId(), JobStatus.TERMINATED, null));
+            ccs.getWorkQueue().schedule(new 
JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED,
+                    null, DummyCallback.INSTANCE));
             return;
         }
         startRunnableTaskClusters(taskClusterRoots);
@@ -520,14 +524,14 @@
         }
     }
 
-    public void abortJob(List<Exception> exceptions) {
+    public void abortJob(List<Exception> exceptions, IResultCallback<Void> 
callback) {
         Set<TaskCluster> inProgressTaskClustersCopy = new 
HashSet<>(inProgressTaskClusters);
         for (TaskCluster tc : inProgressTaskClustersCopy) {
             abortTaskCluster(findLastTaskClusterAttempt(tc), 
TaskClusterAttempt.TaskClusterStatus.ABORTED);
         }
         assert inProgressTaskClusters.isEmpty();
-        ccs.getWorkQueue()
-                .schedule(new JobCleanupWork(ccs.getJobManager(), 
jobRun.getJobId(), JobStatus.FAILURE, exceptions));
+        ccs.getWorkQueue().schedule(
+                new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), 
JobStatus.FAILURE, exceptions, callback));
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt,
@@ -686,7 +690,7 @@
                         + " as failed and the number of max re-attempts = " + 
maxReattempts);
                 if (lastAttempt.getAttempt() >= maxReattempts || 
isCancelled()) {
                     LOGGER.log(Level.INFO, "Aborting the job of " + 
ta.getTaskAttemptId());
-                    abortJob(exceptions);
+                    abortJob(exceptions, DummyCallback.INSTANCE);
                     return;
                 }
                 LOGGER.log(Level.INFO, "We will try to start runnable activity 
clusters of " + ta.getTaskAttemptId());
@@ -696,7 +700,7 @@
                         "Ignoring task failure notification: " + taId + " -- 
Current last attempt = " + lastAttempt);
             }
         } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
+            abortJob(Collections.singletonList(e), DummyCallback.INSTANCE);
         }
     }
 
@@ -720,7 +724,7 @@
                     ta -> HyracksException.create(ErrorCode.NODE_FAILED, 
ta.getNodeId()));
             startRunnableActivityClusters();
         } catch (Exception e) {
-            abortJob(Collections.singletonList(e));
+            abortJob(Collections.singletonList(e), DummyCallback.INSTANCE);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 8fe542f..5ae19a1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 /**
  * This interface abstracts the job lifecycle management and job scheduling 
for a cluster.
@@ -46,11 +47,13 @@
 
     /**
      * Cancel a job with a given job id.
+     * 
+     * @param callback
      *
      * @param jobId,
      *            the id of the job.
      */
-    void cancel(JobId jobId) throws HyracksException;
+    void cancel(JobId jobId, IResultCallback<Void> callback) throws 
HyracksException;
 
     /**
      * This method is called when the master process decides to complete job.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index abf1d57..117cbe1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -46,6 +46,8 @@
 import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
 import org.apache.hyracks.control.cc.scheduler.IJobQueue;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.work.DummyCallback;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -115,17 +117,14 @@
     }
 
     @Override
-    public void cancel(JobId jobId) throws HyracksException {
-        if (jobId == null) {
-            return;
-        }
+    public void cancel(JobId jobId, IResultCallback<Void> callback) throws 
HyracksException {
         // Cancels a running job.
         if (activeRunMap.containsKey(jobId)) {
             JobRun jobRun = activeRunMap.get(jobId);
             // The following call will abort all ongoing tasks and then 
consequently
             // trigger JobCleanupWork and JobCleanupNotificationWork which 
will update the lifecyle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
-            jobRun.getExecutor().cancelJob();
+            jobRun.getExecutor().cancelJob(callback);
             return;
         }
         // Removes a pending job.
@@ -138,6 +137,7 @@
             runMapArchive.put(jobId, jobRun);
             runMapHistory.put(jobId, exceptions);
         }
+        callback.setValue(null);
     }
 
     @Override
@@ -322,7 +322,7 @@
             // fail the job then abort it
             run.setStatus(JobStatus.FAILURE, exceptions);
             // abort job will trigger JobCleanupWork
-            run.getExecutor().abortJob(exceptions);
+            run.getExecutor().abortJob(exceptions, DummyCallback.INSTANCE);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index f3b67c9..e3135df 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -42,10 +42,7 @@
     @Override
     protected void doRun() throws Exception {
         try {
-            if (jobId != null) {
-                jobManager.cancel(jobId);
-            }
-            callback.setValue(null);
+            jobManager.cancel(jobId, callback);
         } catch (Exception e) {
             callback.setException(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 502ac50..bb85c13 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class JobCleanupWork extends AbstractWork {
     private static final Logger LOGGER = 
Logger.getLogger(JobCleanupWork.class.getName());
@@ -37,12 +38,15 @@
     private JobId jobId;
     private JobStatus status;
     private List<Exception> exceptions;
+    private IResultCallback<Void> callback;
 
-    public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus 
status, List<Exception> exceptions) {
+    public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus 
status, List<Exception> exceptions,
+            IResultCallback<Void> callback) {
         this.jobManager = jobManager;
         this.jobId = jobId;
         this.status = status;
         this.exceptions = exceptions;
+        this.callback = callback;
     }
 
     @Override
@@ -53,6 +57,7 @@
         try {
             JobRun jobRun = jobManager.get(jobId);
             jobManager.prepareComplete(jobRun, status, exceptions);
+            callback.setValue(null);
         } catch (HyracksException e) {
             // Fail the job with the caught exception during final completion.
             JobRun run = jobManager.get(jobId);
@@ -62,6 +67,7 @@
             }
             completionException.add(0, e);
             run.setStatus(JobStatus.FAILURE, completionException);
+            callback.setException(e);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index ed2a740..f99d465 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -48,6 +48,7 @@
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.logs.LogFile;
+import org.apache.hyracks.control.common.work.DummyCallback;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -207,7 +208,7 @@
     }
 
     @Test
-    public void testCancel() throws HyracksException {
+    public void testCancel() throws Exception {
         CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = 
mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, 
mockClusterControllerService(), jobCapacityController));
@@ -247,12 +248,12 @@
 
         // Cancels deferred jobs.
         for (JobRun run : deferredRuns) {
-            jobManager.cancel(run.getJobId());
+            jobManager.cancel(run.getJobId(), DummyCallback.INSTANCE);
         }
 
         // Cancels runnable jobs.
         for (JobRun run : acceptedRuns) {
-            jobManager.cancel(run.getJobId());
+            jobManager.cancel(run.getJobId(), DummyCallback.INSTANCE);
         }
 
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
new file mode 100644
index 0000000..24946f0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.work;
+
+public class DummyCallback implements IResultCallback<Void> {
+
+    public static final DummyCallback INSTANCE = new DummyCallback();
+
+    private DummyCallback() {
+    }
+
+    @Override
+    public void setValue(Void result) {
+        // Dummy is used when no callback is provided
+    }
+
+    @Override
+    public void setException(Exception e) {
+        // Dummy is used when no callback is provided
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
index 0ccaf1d..b8eefe7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.common.work;
 
 public interface IResultCallback<T> {
+
     public void setValue(T result);
 
     public void setException(Exception e);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
index 48377e3..4cf1d12 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -89,7 +89,7 @@
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job);
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
@@ -144,7 +144,8 @@
                                                         // B-Tree tuple, etc.
         IFileSplitProvider primarySplitProvider = 
JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
 
-        IIndexDataflowHelperFactory primaryHelperFactory = new 
IndexDataflowHelperFactory(storageManager, primarySplitProvider);
+        IIndexDataflowHelperFactory primaryHelperFactory =
+                new IndexDataflowHelperFactory(storageManager, 
primarySplitProvider);
 
         // create operator descriptor
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 203d22c..dc13c32 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -84,7 +84,7 @@
         JobSpecification job = createJob(options);
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job);
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
@@ -145,7 +145,8 @@
                                                  // to field 0 of B-Tree tuple,
                                                  // etc.
         IFileSplitProvider btreeSplitProvider = 
JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
-        IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
+        IIndexDataflowHelperFactory dataflowHelperFactory =
+                new IndexDataflowHelperFactory(storageManager, 
btreeSplitProvider);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
TreeIndexBulkLoadOperatorDescriptor(spec, recDesc,
                 fieldPermutation, 0.7f, false, 1000L, true, 
dataflowHelperFactory);
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index 603dc6b..bf3b41e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -80,7 +80,7 @@
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job);
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
@@ -139,7 +139,8 @@
                                      // into search op
 
         IFileSplitProvider btreeSplitProvider = 
JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
-        IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
+        IIndexDataflowHelperFactory dataflowHelperFactory =
+                new IndexDataflowHelperFactory(storageManager, 
btreeSplitProvider);
         BTreeSearchOperatorDescriptor btreeSearchOp = new 
BTreeSearchOperatorDescriptor(spec, recDesc, lowKeyFields,
                 highKeyFields, true, true, dataflowHelperFactory, false, 
false, null,
                 NoOpOperationCallbackFactory.INSTANCE, null, null, false);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 7507f10..52a0ace 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -83,7 +83,7 @@
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job);
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
@@ -117,7 +117,8 @@
 
         // use a disk-order scan to read primary index
         IFileSplitProvider primarySplitProvider = 
JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
-        IIndexDataflowHelperFactory primaryHelperFactory = new 
IndexDataflowHelperFactory(storageManager, primarySplitProvider);
+        IIndexDataflowHelperFactory primaryHelperFactory =
+                new IndexDataflowHelperFactory(storageManager, 
primarySplitProvider);
         TreeIndexDiskOrderScanOperatorDescriptor btreeScanOp = new 
TreeIndexDiskOrderScanOperatorDescriptor(spec,
                 recDesc, primaryHelperFactory, 
NoOpOperationCallbackFactory.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeScanOp, splitNCs);
@@ -139,7 +140,8 @@
         // tuple
         int[] fieldPermutation = { 1, 0 };
         IFileSplitProvider btreeSplitProvider = 
JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
-        IIndexDataflowHelperFactory secondaryHelperFactory = new 
IndexDataflowHelperFactory(storageManager, btreeSplitProvider);
+        IIndexDataflowHelperFactory secondaryHelperFactory =
+                new IndexDataflowHelperFactory(storageManager, 
btreeSplitProvider);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
TreeIndexBulkLoadOperatorDescriptor(spec, null,
                 fieldPermutation, 0.7f, false, 1000L, true, 
secondaryHelperFactory);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 1e909ef..5d72703 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -83,7 +83,7 @@
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job);
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
@@ -183,7 +183,8 @@
                                             // op
 
         IFileSplitProvider primarySplitProvider = 
JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
-        IIndexDataflowHelperFactory primaryHelperFactory = new 
IndexDataflowHelperFactory(storageManager, primarySplitProvider);
+        IIndexDataflowHelperFactory primaryHelperFactory =
+                new IndexDataflowHelperFactory(storageManager, 
primarySplitProvider);
         BTreeSearchOperatorDescriptor primarySearchOp = new 
BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
                 primaryLowKeyFields, primaryHighKeyFields, true, true, 
primaryHelperFactory, false, false, null,
                 NoOpOperationCallbackFactory.INSTANCE, null, null, false);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index 82fd737..48ca36f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -106,7 +106,7 @@
         ncConfig1.setClusterListenAddress("127.0.0.1");
         ncConfig1.setDataListenAddress("127.0.0.1");
         ncConfig1.setResultListenAddress("127.0.0.1");
-        ncConfig1.setIODevices(new String [] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+        ncConfig1.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
 
@@ -116,7 +116,7 @@
         ncConfig2.setClusterListenAddress("127.0.0.1");
         ncConfig2.setDataListenAddress("127.0.0.1");
         ncConfig2.setResultListenAddress("127.0.0.1");
-        ncConfig2.setIODevices(new String [] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+        ncConfig2.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
@@ -146,7 +146,7 @@
 
     protected void runTest(JobSpecification spec) throws Exception {
         JobId jobId = executeTest(spec);
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
     }
 
     protected List<String> readResults(JobSpecification spec, JobId jobId, 
ResultSetId resultSetId) throws Exception {
@@ -209,7 +209,7 @@
             expectedFile.close();
         }
 
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         return true;
     }
 
@@ -226,7 +226,7 @@
         }
         output.close();
 
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
     }
 
     protected FileSplit createFile(NodeControllerService ncs) throws 
IOException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 18479e2..1c0220d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -127,7 +127,7 @@
     }
 
     protected void waitForCompletion(JobId jobId) throws Exception {
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
     }
 
     protected JobStatus getJobStatus(JobId jobId) throws Exception {
@@ -188,7 +188,7 @@
         }
         boolean expectedExceptionThrown = false;
         try {
-            hcc.waitForCompletion(jobId);
+            hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         } catch (HyracksDataException hde) {
             if (expectedErrorMessage != null) {
                 if (hde.toString().contains(expectedErrorMessage)) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
index f2f8061..c4636fb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java
@@ -104,7 +104,7 @@
             WaitingOperatorDescriptor.CONTINUE_RUNNING.setTrue();
             WaitingOperatorDescriptor.CONTINUE_RUNNING.notify();
         }
-        hcc.waitForCompletion(jId);
+        hcc.waitForCompletion(jId, Long.MAX_VALUE);
     }
 
     private int countJobs(String status) throws IOException, 
URISyntaxException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..0c27b3d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -79,7 +79,7 @@
         ncConfig1.setDataListenAddress("127.0.0.1");
         ncConfig1.setResultListenAddress("127.0.0.1");
         ncConfig1.setResultSweepThreshold(5000);
-        ncConfig1.setIODevices(new String [] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+        ncConfig1.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
         NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
         nc1 = Mockito.spy(nc1Base);
         nc1.start();
@@ -91,7 +91,7 @@
         ncConfig2.setDataListenAddress("127.0.0.1");
         ncConfig2.setResultListenAddress("127.0.0.1");
         ncConfig2.setResultSweepThreshold(5000);
-        ncConfig2.setIODevices(new String [] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+        ncConfig2.setIODevices(new String[] { 
joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
         NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
         nc2 = Mockito.spy(nc2Base);
         nc2.start();
@@ -127,7 +127,7 @@
 
         //run the first job
         hcc.startJob(jobId1);
-        hcc.waitForCompletion(jobId1);
+        hcc.waitForCompletion(jobId1, Long.MAX_VALUE);
 
         //destroy the first job
         hcc.destroyJob(jobId1);
@@ -143,7 +143,7 @@
 
         //run the second job
         hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
+        hcc.waitForCompletion(jobId2, Long.MAX_VALUE);
 
         //wait ten seconds to ensure the result sweeper does not break the job
         //The result sweeper runs every 5 seconds during the tests
@@ -151,7 +151,7 @@
 
         //run the second job again
         hcc.startJob(jobId2);
-        hcc.waitForCompletion(jobId2);
+        hcc.waitForCompletion(jobId2, Long.MAX_VALUE);
 
         //destroy the second job
         hcc.destroyJob(jobId2);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 650c60d..d31be22 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -113,7 +113,7 @@
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
                 options.runtimeProfiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) 
: EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
@@ -139,11 +139,11 @@
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider splitsProvider = new 
ConstantFileSplitProvider(inSplits);
-        RecordDescriptor wordDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer() });
+        RecordDescriptor wordDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer() });
 
-        FileScanOperatorDescriptor wordScanner = new 
FileScanOperatorDescriptor(spec, splitsProvider,
-                new WordTupleParserFactory(), wordDesc);
+        FileScanOperatorDescriptor wordScanner =
+                new FileScanOperatorDescriptor(spec, splitsProvider, new 
WordTupleParserFactory(), wordDesc);
         createPartitionConstraint(spec, wordScanner, inSplits);
 
         RecordDescriptor groupResultDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
@@ -170,8 +170,8 @@
                             
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
             spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
         } else {
-            IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
-                    
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+            IBinaryComparatorFactory[] cfs =
+                    new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
             IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
                     ? new InMemorySortOperatorDescriptor(spec, keys, new 
UTF8StringNormalizedKeyComputerFactory(), cfs,
                             wordDesc)
@@ -195,9 +195,9 @@
         }
 
         IFileSplitProvider outSplitProvider = new 
ConstantFileSplitProvider(outSplits);
-        IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
-                ? new PlainFileWriterOperatorDescriptor(spec, 
outSplitProvider, ",")
-                : new FrameFileWriterOperatorDescriptor(spec, 
outSplitProvider);
+        IOperatorDescriptor writer =
+                "text".equalsIgnoreCase(format) ? new 
PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+                        : new FrameFileWriterOperatorDescriptor(spec, 
outSplitProvider);
         createPartitionConstraint(spec, writer, outSplits);
 
         IConnectorDescriptor gbyPrinterConn = new 
OneToOneConnectorDescriptor(spec);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
index 42fe8c9..8a86692 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -118,7 +118,7 @@
             System.out.print("CreateJobTime:" + (System.currentTimeMillis() - 
start));
             start = System.currentTimeMillis();
             JobId jobId = hcc.startJob(job);
-            hcc.waitForCompletion(jobId);
+            hcc.waitForCompletion(jobId, Long.MAX_VALUE);
             System.out.println("JobExecuteTime:" + (System.currentTimeMillis() 
- start));
         }
     }
@@ -134,8 +134,8 @@
         createPartitionConstraint(spec, fileScanner, inSplits);
 
         // Output: each unique string with an integer count
-        RecordDescriptor outDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE,
+        RecordDescriptor outDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE,
                         // IntegerSerializerDeserializer.INSTANCE,
                         IntegerSerializerDeserializer.INSTANCE });
 
@@ -187,9 +187,9 @@
         spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
 
         IFileSplitProvider outSplitProvider = new 
ConstantFileSplitProvider(outSplits);
-        AbstractSingleActivityOperatorDescriptor writer = outPlain ? new 
PlainFileWriterOperatorDescriptor(spec,
-                outSplitProvider, "|")
-                : new FrameFileWriterOperatorDescriptor(spec, 
outSplitProvider);
+        AbstractSingleActivityOperatorDescriptor writer =
+                outPlain ? new PlainFileWriterOperatorDescriptor(spec, 
outSplitProvider, "|")
+                        : new FrameFileWriterOperatorDescriptor(spec, 
outSplitProvider);
         createPartitionConstraint(spec, writer, outSplits);
         IConnectorDescriptor groupOutConn = new 
OneToOneConnectorDescriptor(spec);
         spec.connect(groupOutConn, grouper, 0, writer, 0);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index c3d0df1..2a9f351 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -140,7 +140,7 @@
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
                 options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : 
EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index 8ab0708..347ade5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -89,13 +89,13 @@
     }
 
     static int[] SortFields = new int[] { 1, 0 };
-    static IBinaryComparatorFactory[] SortFieldsComparatorFactories = new 
IBinaryComparatorFactory[] {
-            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+    static IBinaryComparatorFactory[] SortFieldsComparatorFactories =
+            new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                    
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
 
-    static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories = new 
IBinaryHashFunctionFactory[] {
-            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) 
};
+    static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories =
+            new IBinaryHashFunctionFactory[] { 
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                    
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) };
 
     public static void main(String[] args) throws Exception {
         Options options = new Options();
@@ -109,13 +109,13 @@
         IHyracksClientConnection hcc = new HyracksConnection(options.host, 
options.port);
 
         JobSpecification job = 
createJob(parseFileSplits(options.inFileOrderSplits),
-                parseFileSplits(options.outFileSplits),
-                options.memBufferAlg, options.frameLimit, options.frameSize, 
options.topK, options.usingHeapSorter);
+                parseFileSplits(options.outFileSplits), options.memBufferAlg, 
options.frameLimit, options.frameSize,
+                options.topK, options.usingHeapSorter);
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
                 options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : 
EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, Long.MAX_VALUE);
         long end = System.currentTimeMillis();
         System.err.println("finished in:" + (end - start) + "ms");
     }
@@ -135,8 +135,8 @@
                     SortFieldsComparatorFactories, ordersDesc);
         } else {
             if (memBufferAlg.equalsIgnoreCase("bestfit")) {
-                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, 
SortFields,
-                        null, SortFieldsComparatorFactories, ordersDesc, 
Algorithm.MERGE_SORT,
+                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, 
SortFields, null,
+                        SortFieldsComparatorFactories, ordersDesc, 
Algorithm.MERGE_SORT,
                         EnumFreeSlotPolicy.SMALLEST_FIT, limit);
             } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) {
                 sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, 
SortFields, null,
@@ -158,8 +158,8 @@
 
         spec.connect(
                 new MToNPartitioningMergingConnectorDescriptor(spec,
-                        new FieldHashPartitionComputerFactory(SortFields, 
orderBinaryHashFunctionFactories),
-                        SortFields, SortFieldsComparatorFactories, new 
UTF8StringNormalizedKeyComputerFactory()),
+                        new FieldHashPartitionComputerFactory(SortFields, 
orderBinaryHashFunctionFactories), SortFields,
+                        SortFieldsComparatorFactories, new 
UTF8StringNormalizedKeyComputerFactory()),
                 sorter, 0, printer, 0);
 
         spec.addRoot(printer);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
index 0d0cd3e..b0aad4f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
@@ -73,7 +73,6 @@
     private static final String PATH_TO_HADOOP_CONF = 
FileUtil.joinPath(TEST_RESOURCES, "hadoop", "conf");
     protected static final String BUILD_DIR = FileUtil.joinPath("target", 
"build");
 
-
     private static final String DATA_PATH = FileUtil.joinPath(TEST_RESOURCES, 
"data", "customer.tbl");
     protected static final String HDFS_INPUT_PATH = "/customer/";
     protected static final String HDFS_OUTPUT_PATH = "/customer_result/";
@@ -151,11 +150,11 @@
 
         String[] readSchedule = scheduler.getLocationConstraints(splits);
         JobSpecification jobSpec = new JobSpecification();
-        RecordDescriptor recordDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer() });
+        RecordDescriptor recordDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer() });
 
-        String[] locations = new String[] { HyracksUtils.NC1_ID, 
HyracksUtils.NC1_ID, HyracksUtils.NC2_ID,
-                HyracksUtils.NC2_ID };
+        String[] locations =
+                new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, 
HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
         HDFSReadOperatorDescriptor readOperator = new 
HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
                 readSchedule, new TextKeyValueParserFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, 
readOperator, locations);
@@ -164,21 +163,23 @@
                 new IBinaryComparatorFactory[] { 
RawBinaryComparatorFactory.INSTANCE }, recordDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, 
sortOperator, locations);
 
-        HDFSWriteOperatorDescriptor writeOperator = new 
HDFSWriteOperatorDescriptor(jobSpec, conf,
-                new TextTupleWriterFactory());
+        HDFSWriteOperatorDescriptor writeOperator =
+                new HDFSWriteOperatorDescriptor(jobSpec, conf, new 
TextTupleWriterFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, 
writeOperator, HyracksUtils.NC1_ID);
 
         jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
readOperator, 0, sortOperator, 0);
-        jobSpec.connect(new 
MToNPartitioningMergingConnectorDescriptor(jobSpec, new 
FieldHashPartitionComputerFactory(
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { 
RawBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 0 }, new IBinaryComparatorFactory[] { 
RawBinaryComparatorFactory.INSTANCE }, null),
+        jobSpec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                        new FieldHashPartitionComputerFactory(new int[] { 0 },
+                                new IBinaryHashFunctionFactory[] { 
RawBinaryHashFunctionFactory.INSTANCE }),
+                        new int[] { 0 }, new IBinaryComparatorFactory[] { 
RawBinaryComparatorFactory.INSTANCE }, null),
                 sortOperator, 0, writeOperator, 0);
         jobSpec.addRoot(writeOperator);
 
-        IHyracksClientConnection client = new 
HyracksConnection(HyracksUtils.CC_HOST,
-                HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+        IHyracksClientConnection client =
+                new HyracksConnection(HyracksUtils.CC_HOST, 
HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
         JobId jobId = client.startJob(jobSpec);
-        client.waitForCompletion(jobId);
+        client.waitForCompletion(jobId, Long.MAX_VALUE);
 
         Assert.assertEquals(true, checkResults());
     }
@@ -195,8 +196,8 @@
         Path actual = new Path(ACTUAL_RESULT_DIR);
         dfs.copyToLocalFile(result, actual);
 
-        TestUtils.compareWithResult(new 
File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")), new File(
-                FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result", 
"part-0")));
+        TestUtils.compareWithResult(new 
File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")),
+                new File(FileUtil.joinPath(ACTUAL_RESULT_DIR, 
"customer_result", "part-0")));
         return true;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
index 1fddc46..287baeb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
@@ -90,10 +90,10 @@
         cc.stop();
     }
 
-    public static void runJob(JobSpecification spec, String appName) throws 
Exception {
+    public static void runJob(JobSpecification spec, String appName, long 
timeout) throws Exception {
         spec.setFrameSize(FRAME_SIZE);
         JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-        hcc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId, timeout);
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
index 3c9b1c0..84cdb65 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -21,7 +21,6 @@
 
 import java.util.List;
 
-import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -55,6 +54,8 @@
 import org.apache.hyracks.hdfs.utils.HyracksUtils;
 import org.apache.hyracks.hdfs2.scheduler.Scheduler;
 
+import junit.framework.Assert;
+
 /**
  * Test the org.apache.hyracks.hdfs2.dataflow package,
  * the operators for the Hadoop new API.
@@ -86,6 +87,7 @@
      *
      * @throws Exception
      */
+    @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public void testHDFSReadWriteOperators() throws Exception {
         FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
@@ -98,11 +100,11 @@
 
         String[] readSchedule = scheduler.getLocationConstraints(splits);
         JobSpecification jobSpec = new JobSpecification();
-        RecordDescriptor recordDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer() });
+        RecordDescriptor recordDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer() });
 
-        String[] locations = new String[] { HyracksUtils.NC1_ID, 
HyracksUtils.NC1_ID, HyracksUtils.NC2_ID,
-                HyracksUtils.NC2_ID };
+        String[] locations =
+                new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, 
HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
         HDFSReadOperatorDescriptor readOperator = new 
HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits,
                 readSchedule, new TextKeyValueParserFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, 
readOperator, locations);
@@ -111,21 +113,23 @@
                 new IBinaryComparatorFactory[] { 
RawBinaryComparatorFactory.INSTANCE }, recordDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, 
sortOperator, locations);
 
-        HDFSWriteOperatorDescriptor writeOperator = new 
HDFSWriteOperatorDescriptor(jobSpec, conf,
-                new TextTupleWriterFactory());
+        HDFSWriteOperatorDescriptor writeOperator =
+                new HDFSWriteOperatorDescriptor(jobSpec, conf, new 
TextTupleWriterFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, 
writeOperator, HyracksUtils.NC1_ID);
 
         jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
readOperator, 0, sortOperator, 0);
-        jobSpec.connect(new 
MToNPartitioningMergingConnectorDescriptor(jobSpec, new 
FieldHashPartitionComputerFactory(
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { 
RawBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 0 }, new IBinaryComparatorFactory[] { 
RawBinaryComparatorFactory.INSTANCE }, null),
+        jobSpec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+                        new FieldHashPartitionComputerFactory(new int[] { 0 },
+                                new IBinaryHashFunctionFactory[] { 
RawBinaryHashFunctionFactory.INSTANCE }),
+                        new int[] { 0 }, new IBinaryComparatorFactory[] { 
RawBinaryComparatorFactory.INSTANCE }, null),
                 sortOperator, 0, writeOperator, 0);
         jobSpec.addRoot(writeOperator);
 
-        IHyracksClientConnection client = new 
HyracksConnection(HyracksUtils.CC_HOST,
-                HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+        IHyracksClientConnection client =
+                new HyracksConnection(HyracksUtils.CC_HOST, 
HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
         JobId jobId = client.startJob(jobSpec);
-        client.waitForCompletion(jobId);
+        client.waitForCompletion(jobId, Long.MAX_VALUE);
 
         Assert.assertEquals(true, checkResults());
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 45634ad..dfd5bad 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -260,7 +260,7 @@
         return b && (path.length() == cpl || '/' == path.charAt(cpl));
     }
 
-    protected HttpServerHandler<HttpServer> createHttpHandler(int chunkSize) {
+    protected HttpServerHandler createHttpHandler(int chunkSize) {
         return new HttpServerHandler<>(this, chunkSize);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
index aebe2f5..3c0f699 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
@@ -20,6 +20,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hyracks.ipc.exceptions.IPCException;
 
@@ -27,10 +28,10 @@
     private final Map<Long, Request> reqMap;
 
     public RPCInterface() {
-        reqMap = new HashMap<Long, RPCInterface.Request>();
+        reqMap = new HashMap<>();
     }
 
-    public Object call(IIPCHandle handle, Object request) throws Exception {
+    public Object call(IIPCHandle handle, Object request, long timeout) throws 
Exception {
         Request req;
         long mid;
         synchronized (this) {
@@ -41,7 +42,7 @@
             mid = handle.send(-1, request, null);
             reqMap.put(mid, req);
         }
-        return req.getResponse();
+        return req.getResponse(timeout);
     }
 
     @Override
@@ -88,10 +89,15 @@
             notifyAll();
         }
 
-        synchronized Object getResponse() throws Exception {
-            while (pending) {
+        synchronized Object getResponse(long timeout) throws Exception {
+            long start = System.currentTimeMillis();
+            long now = start;
+            while (pending && now - start < timeout) {
                 wait();
             }
+            if (pending) {
+                throw new TimeoutException();
+            }
             if (exception != null) {
                 throw exception;
             }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index b454520..1125c64 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -23,16 +23,15 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
-import junit.framework.Assert;
-
-import org.junit.Test;
-
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.api.RPCInterface;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.junit.Test;
+
+import junit.framework.Assert;
 
 public class IPCTest {
     @Test
@@ -48,11 +47,11 @@
         IIPCHandle handle = client.getHandle(serverAddr);
 
         for (int i = 0; i < 100; ++i) {
-            Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), 
Integer.valueOf(2 * i));
+            Assert.assertEquals(rpci.call(handle, Integer.valueOf(i), 
Long.MAX_VALUE), Integer.valueOf(2 * i));
         }
 
         try {
-            rpci.call(handle, "Foo");
+            rpci.call(handle, "Foo", Long.MAX_VALUE);
             Assert.assertTrue(false);
         } catch (Exception e) {
             Assert.assertTrue(true);
@@ -63,8 +62,8 @@
         final Executor executor = Executors.newCachedThreadPool();
         IIPCI ipci = new IIPCI() {
             @Override
-            public void deliverIncomingMessage(final IIPCHandle handle, final 
long mid, long rmid,
-                    final Object payload, Exception exception) {
+            public void deliverIncomingMessage(final IIPCHandle handle, final 
long mid, long rmid, final Object payload,
+                    Exception exception) {
                 executor.execute(new Runnable() {
                     @Override
                     public void run() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1961
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to