Repository: asterixdb
Updated Branches:
  refs/heads/master 7a0ba78d3 -> 4671f7127


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index e2f17ac..00fb1b0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -28,7 +28,7 @@ import 
org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 
 public class DefaultStatementExecutorFactory implements 
IStatementExecutorFactory {
@@ -48,9 +48,9 @@ public class DefaultStatementExecutorFactory implements 
IStatementExecutorFactor
     }
 
     @Override
-    public IStatementExecutor create(ICcApplicationContext appCtx, 
List<Statement> statements, SessionConfig conf,
+    public IStatementExecutor create(ICcApplicationContext appCtx, 
List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, 
IStorageComponentProvider storageComponentProvider) {
-        return new QueryTranslator(appCtx, statements, conf, 
compilationProvider, storageComponentProvider,
+        return new QueryTranslator(appCtx, statements, output, 
compilationProvider, storageComponentProvider,
                 executorService);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d37de0d..e0648ce 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -161,6 +161,7 @@ import 
org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
@@ -170,6 +171,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Triple;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -203,6 +205,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
     public static final boolean IS_DEBUG_MODE = false;// true
     protected final List<Statement> statements;
     protected final ICcApplicationContext appCtx;
+    protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
     protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
@@ -211,12 +214,13 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
 
-    public QueryTranslator(ICcApplicationContext appCtx, List<Statement> 
statements, SessionConfig conf,
+    public QueryTranslator(ICcApplicationContext appCtx, List<Statement> 
statements, SessionOutput output,
             ILangCompilationProvider compliationProvider, 
IStorageComponentProvider componentProvider,
             ExecutorService executorService) {
         this.appCtx = appCtx;
         this.statements = statements;
-        this.sessionConfig = conf;
+        this.sessionOutput = output;
+        this.sessionConfig = output.config();
         this.componentProvider = componentProvider;
         declaredFunctions = getDeclaredFunctions(statements);
         apiFramework = new APIFramework(compliationProvider);
@@ -241,13 +245,14 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, 
IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats) throws Exception {
-        compileAndExecute(hcc, hdc, resultDelivery, stats, null, null);
+            ResultMetadata outMetadata, Stats stats) throws Exception {
+        compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null, 
null);
     }
 
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, 
IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats, String clientContextId, IStatementExecutorContext 
ctx) throws Exception {
+            ResultMetadata outMetadata, Stats stats, String clientContextId, 
IStatementExecutorContext ctx)
+            throws Exception {
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -262,7 +267,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         try {
             for (Statement stmt : statements) {
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
-                    
sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
+                    
sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
                 }
                 validateOperation(appCtx, activeDataverse, stmt);
                 rewriteStatement(stmt); // Rewrite the statement's AST.
@@ -324,8 +329,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                             metadataProvider.setResultAsyncMode(resultDelivery 
== ResultDelivery.ASYNC
                                     || resultDelivery == 
ResultDelivery.DEFERRED);
                         }
-                        handleInsertUpsertStatement(metadataProvider, stmt, 
hcc, hdc, resultDelivery, stats, false,
-                                clientContextId, ctx);
+                        handleInsertUpsertStatement(metadataProvider, stmt, 
hcc, hdc, resultDelivery, outMetadata,
+                                stats, false, clientContextId, ctx);
                         break;
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc, 
false);
@@ -358,8 +363,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                         metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.DEFERRED);
-                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, stats, clientContextId,
-                                ctx);
+                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, outMetadata, stats,
+                                clientContextId, ctx);
                         break;
                     case Statement.Kind.COMPACT:
                         handleCompactStatement(metadataProvider, stmt, hcc);
@@ -1694,7 +1699,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, 
loadStmt.getDatasetName().getValue(),
                             loadStmt.getAdapter(), loadStmt.getProperties(), 
loadStmt.dataIsAlreadySorted());
-            JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionConfig, cls);
+            JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionOutput, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1712,8 +1717,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider 
metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery,
-            IStatementExecutor.Stats stats, boolean compileOnly, String 
clientContextId, IStatementExecutorContext ctx)
-            throws Exception {
+            ResultMetadata outMetadata, Stats stats, boolean compileOnly, 
String clientContextId,
+            IStatementExecutorContext ctx) throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtInsertUpsert.getDataverseName());
         final IMetadataLocker locker = new IMetadataLocker() {
@@ -1755,7 +1760,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
 
         if (stmtInsertUpsert.getReturnExpression() != null) {
-            deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, stats, clientContextId, ctx);
+            deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
+                    clientContextId, ctx);
         } else {
             locker.lock();
             try {
@@ -1811,11 +1817,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
         // Query Rewriting (happens under the same ongoing metadata 
transaction)
         Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
query, sessionConfig);
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
query, sessionOutput);
 
         // Query Compilation (happens under the same ongoing metadata 
transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, (Query) rewrittenResult.first,
-                rewrittenResult.second, stmt == null ? null : 
stmt.getDatasetName(), sessionConfig, stmt);
+                rewrittenResult.second, stmt == null ? null : 
stmt.getDatasetName(), sessionOutput, stmt);
     }
 
     private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector 
clusterInfoCollector,
@@ -1824,7 +1830,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
         // Insert/upsert statement rewriting (happens under the same ongoing 
metadata transaction)
         Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
insertUpsert, sessionConfig);
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
insertUpsert, sessionOutput);
 
         InsertStatement rewrittenInsertUpsert = (InsertStatement) 
rewrittenResult.first;
         String dataverseName = 
getActiveDataverse(rewrittenInsertUpsert.getDataverseName());
@@ -1846,7 +1852,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
         // Insert/upsert statement compilation (happens under the same ongoing 
metadata transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, rewrittenInsertUpsert.getQuery(),
-                rewrittenResult.second, datasetName, sessionConfig, clfrqs);
+                rewrittenResult.second, datasetName, sessionOutput, clfrqs);
     }
 
     protected void handleCreateFeedStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
@@ -2052,7 +2058,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 datasets.add(ds);
             }
             org.apache.commons.lang3.tuple.Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> jobInfo =
-                    FeedOperations.buildStartFeedJob(sessionConfig, 
metadataProvider, feed, feedConnections,
+                    FeedOperations.buildStartFeedJob(sessionOutput, 
metadataProvider, feed, feedConnections,
                             compilationProvider, storageComponentProvider, 
qtFactory, hcc);
 
             JobSpecification feedJob = jobInfo.getLeft();
@@ -2287,8 +2293,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     }
 
     protected void handleQuery(MetadataProvider metadataProvider, Query query, 
IHyracksClientConnection hcc,
-            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, 
String clientContextId,
-            IStatementExecutorContext ctx) throws Exception {
+            IHyracksDataset hdc, ResultDelivery resultDelivery, ResultMetadata 
outMetadata, Stats stats,
+            String clientContextId, IStatementExecutorContext ctx) throws 
Exception {
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
@@ -2318,12 +2324,14 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw e;
             }
         };
-        deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, stats, clientContextId, ctx);
+        deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats, clientContextId,
+                ctx);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset 
hdc, IStatementCompiler compiler,
-            MetadataProvider metadataProvider, IMetadataLocker locker, 
ResultDelivery resultDelivery, Stats stats,
-            String clientContextId, IStatementExecutorContext ctx) throws 
Exception {
+            MetadataProvider metadataProvider, IMetadataLocker locker, 
ResultDelivery resultDelivery,
+            ResultMetadata outMetadata, Stats stats, String clientContextId, 
IStatementExecutorContext ctx)
+            throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
@@ -2339,13 +2347,17 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             case IMMEDIATE:
                 createAndRunJob(hcc, null, compiler, locker, resultDelivery, 
id -> {
                     final ResultReader resultReader = new ResultReader(hdc, 
id, resultSetId);
-                    ResultUtil.printResults(appCtx, resultReader, 
sessionConfig, stats,
+                    ResultUtil.printResults(appCtx, resultReader, 
sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
                 }, clientContextId, ctx);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, null, compiler, locker, resultDelivery, 
id -> {
-                    ResultUtil.printResultHandle(sessionConfig, new 
ResultHandle(id, resultSetId));
+                    ResultUtil.printResultHandle(sessionOutput, new 
ResultHandle(id, resultSetId));
+                    if (outMetadata != null) {
+                        outMetadata.getResultSets()
+                                .add(Triple.of(id, resultSetId, 
metadataProvider.findOutputRecordType()));
+                    }
                 }, clientContextId, ctx);
                 break;
             default:
@@ -2360,8 +2372,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         try {
             createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id 
-> {
                 final ResultHandle handle = new ResultHandle(id, resultSetId);
-                ResultUtil.printStatus(sessionConfig, 
AbstractQueryApiServlet.ResultStatus.RUNNING);
-                ResultUtil.printResultHandle(sessionConfig, handle);
+                ResultUtil.printStatus(sessionOutput, 
AbstractQueryApiServlet.ResultStatus.RUNNING);
+                ResultUtil.printResultHandle(sessionOutput, handle);
                 synchronized (printed) {
                     printed.setTrue();
                     printed.notify();
@@ -2370,8 +2382,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         } catch (Exception e) {
             if (JobId.INVALID.equals(jobId.getValue())) {
                 // compilation failed
-                ResultUtil.printStatus(sessionConfig, 
AbstractQueryApiServlet.ResultStatus.FAILED);
-                ResultUtil.printError(sessionConfig.out(), e);
+                ResultUtil.printStatus(sessionOutput, 
AbstractQueryApiServlet.ResultStatus.FAILED);
+                ResultUtil.printError(sessionOutput.out(), e);
             } else {
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
                         resultDelivery.name() + " job with id " + 
jobId.getValue() + " " + "failed", e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index bf7d5eb..57cc340 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -19,17 +19,19 @@
 
 package org.apache.asterix.hyracks.bootstrap;
 
+import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
+import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
 import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
@@ -73,6 +75,7 @@ import 
org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -95,6 +98,7 @@ public class CCApplication extends BaseCCApplication {
     protected ICCServiceContext ccServiceCtx;
     protected CCExtensionManager ccExtensionManager;
     protected IStorageComponentProvider componentProvider;
+    protected StatementExecutorContext statementExecutorCtx;
     protected WebManager webManager;
     protected CcApplicationContext appCtx;
     private IJobCapacityController jobCapacityController;
@@ -124,9 +128,10 @@ public class CCApplication extends BaseCCApplication {
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager.instantiate(ccServiceCtx, getHcc(), 
componentProvider);
+        statementExecutorCtx = new StatementExecutorContext();
         appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, resourceIdManager,
                 () -> MetadataManager.INSTANCE, 
GlobalRecoveryManager.instance(), ftStrategy,
-                new ActiveLifecycleListener());
+                new ActiveLifecycleListener(), componentProvider);
         ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
         ccExtensionManager = new CCExtensionManager(getExtensions());
         appCtx.setExtensionManager(ccExtensionManager);
@@ -184,7 +189,7 @@ public class CCApplication extends BaseCCApplication {
         IHyracksClientConnection hcc = getHcc();
         webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { 
"/*" }, appCtx,
-                ccExtensionManager.getAqlCompilationProvider(), 
ccExtensionManager.getSqlppCompilationProvider(),
+                ccExtensionManager.getCompilationProvider(AQL), 
ccExtensionManager.getCompilationProvider(SQLPP),
                 getStatementExecutorFactory(), componentProvider));
         return webServer;
     }
@@ -197,6 +202,8 @@ public class CCApplication extends BaseCCApplication {
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
                 ccServiceCtx.getControllerService().getExecutor());
+        jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, 
statementExecutorCtx);
+        jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, 
ccServiceCtx);
 
         // AQL rest APIs.
         addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -241,28 +248,28 @@ public class CCApplication extends BaseCCApplication {
     protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String 
key, String... paths) {
         switch (key) {
             case Servlets.AQL:
-                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_QUERY:
-                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_UPDATE:
-                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_DDL:
-                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP:
-                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_QUERY:
-                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_UPDATE:
-                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_DDL:
-                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.RUNNING_REQUESTS:
                 return new QueryCancellationServlet(ctx, paths);
@@ -271,8 +278,9 @@ public class CCApplication extends BaseCCApplication {
             case Servlets.QUERY_RESULT:
                 return new QueryResultApiServlet(ctx, paths, appCtx);
             case Servlets.QUERY_SERVICE:
-                return new QueryServiceServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
-                        getStatementExecutorFactory(), componentProvider);
+                return new QueryServiceServlet(ctx, paths, appCtx, SQLPP,
+                        ccExtensionManager.getCompilationProvider(SQLPP), 
getStatementExecutorFactory(),
+                        componentProvider);
             case Servlets.CONNECTOR:
                 return new ConnectorApiServlet(ctx, paths, appCtx);
             case Servlets.SHUTDOWN:
@@ -292,13 +300,17 @@ public class CCApplication extends BaseCCApplication {
         }
     }
 
-    private IStatementExecutorFactory getStatementExecutorFactory() {
+    public IStatementExecutorFactory getStatementExecutorFactory() {
         return 
ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
     }
 
+    public IStatementExecutorContext getStatementExecutorContext() {
+        return statementExecutorCtx;
+    }
+
     @Override
     public void startupCompleted() throws Exception {
-        ccServiceCtx.getControllerService().getExecutor().submit((Callable) () 
-> {
+        ccServiceCtx.getControllerService().getExecutor().submit(() -> {
             ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
             
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
             return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 9c24acf..ec01e1c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -49,21 +49,26 @@ import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourc
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.BaseNCApplication;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.server.WebManager;
 
 public class NCApplication extends BaseNCApplication {
     private static final Logger LOGGER = 
Logger.getLogger(NCApplication.class.getName());
 
-    private INCServiceContext ncServiceCtx;
+    protected INCServiceContext ncServiceCtx;
     private INcApplicationContext runtimeContext;
     private String nodeId;
     private boolean stopInitiated = false;
     private SystemState systemState;
+    protected WebManager webManager;
 
     @Override
     public void registerConfig(IConfigManager configManager) {
@@ -122,6 +127,8 @@ public class NCApplication extends BaseNCApplication {
             
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
+        webManager = new WebManager();
+
         performLocalCleanUp();
     }
 
@@ -131,6 +138,10 @@ public class NCApplication extends BaseNCApplication {
         Logger.getLogger("org.apache.asterix").setLevel(level);
     }
 
+    protected void configureServers() throws Exception {
+        // override to start web services on NC nodes
+    }
+
     protected List<AsterixExtension> getExtensions() {
         return Collections.emptyList();
     }
@@ -143,6 +154,9 @@ public class NCApplication extends BaseNCApplication {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Stopping Asterix node controller: " + nodeId);
             }
+
+            webManager.stop();
+
             //Clean any temporary files
             performLocalCleanUp();
 
@@ -163,6 +177,10 @@ public class NCApplication extends BaseNCApplication {
 
     @Override
     public void startupCompleted() throws Exception {
+        // configure servlets after joining the cluster, so we can create 
HyracksClientConnection
+        configureServers();
+        webManager.start();
+
         // Since we don't pass initial run flag in 
AsterixHyracksIntegrationUtil, we use the virtualNC flag
         final NodeProperties nodeProperties = 
runtimeContext.getNodeProperties();
         if (systemState == SystemState.PERMANENT_DATA_LOSS
@@ -262,4 +280,10 @@ public class NCApplication extends BaseNCApplication {
     public INcApplicationContext getApplicationContext() {
         return runtimeContext;
     }
+
+    protected IHyracksClientConnection getHcc() throws Exception {
+        NodeControllerService ncSrv = (NodeControllerService) 
ncServiceCtx.getControllerService();
+        ClusterControllerInfo ccInfo = 
ncSrv.getNodeParameters().getClusterControllerInfo();
+        return new HyracksConnection(ccInfo.getClientNetAddress(), 
ccInfo.getClientNetPort());
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 630aabe..33e89f0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -21,6 +21,7 @@ package org.apache.asterix.messaging;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -30,12 +31,16 @@ import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
+import io.netty.util.collection.LongObjectHashMap;
+import io.netty.util.collection.LongObjectMap;
+
 public class NCMessageBroker implements INCMessageBroker {
     private static final Logger LOGGER = 
Logger.getLogger(NCMessageBroker.class.getName());
 
@@ -44,6 +49,8 @@ public class NCMessageBroker implements INCMessageBroker {
     private final LinkedBlockingQueue<INcAddressedMessage> receivedMsgsQ;
     private final ConcurrentFramePool messagingFramePool;
     private final int maxMsgSize;
+    private final AtomicLong futureIdGenerator;
+    private final LongObjectMap<MessageFuture> futureMap;
 
     public NCMessageBroker(NodeControllerService ncs, MessagingProperties 
messagingProperties) {
         this.ncs = ncs;
@@ -53,6 +60,8 @@ public class NCMessageBroker implements INCMessageBroker {
         messagingFramePool = new ConcurrentFramePool(ncs.getId(), 
messagingMemoryBudget,
                 messagingProperties.getFrameSize());
         receivedMsgsQ = new LinkedBlockingQueue<>();
+        futureIdGenerator = new AtomicLong();
+        futureMap = new LongObjectHashMap<>();
         MessageDeliveryService msgDeliverySvc = new MessageDeliveryService();
         appContext.getThreadExecutor().execute(msgDeliverySvc);
     }
@@ -104,6 +113,26 @@ public class NCMessageBroker implements INCMessageBroker {
         ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
     }
 
+    @Override
+    public MessageFuture registerMessageFuture() {
+        long futureId = futureIdGenerator.incrementAndGet();
+        MessageFuture future = new MessageFuture(futureId);
+        synchronized (futureMap) {
+            if (futureMap.containsKey(futureId)) {
+                throw new IllegalStateException();
+            }
+            futureMap.put(futureId, future);
+        }
+        return future;
+    }
+
+    @Override
+    public MessageFuture deregisterMessageFuture(long futureId) {
+        synchronized (futureMap) {
+            return futureMap.remove(futureId);
+        }
+    }
+
     private class MessageDeliveryService implements Runnable {
         /*
          * TODO Currently this thread is not stopped when it is interrupted 
because

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index bffaeef..d1ff871 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -69,7 +69,7 @@ import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.translator.CompiledStatements;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -151,7 +151,7 @@ public class FeedOperations {
         return spec;
     }
 
-    private static JobSpecification getConnectionJob(SessionConfig 
sessionConfig, MetadataProvider metadataProvider,
+    private static JobSpecification getConnectionJob(SessionOutput 
sessionOutput, MetadataProvider metadataProvider,
             FeedConnection feedConnection, String[] locations, 
ILangCompilationProvider compilationProvider,
             IStorageComponentProvider storageComponentProvider, 
DefaultStatementExecutorFactory qtFactory,
             IHyracksClientConnection hcc) throws AlgebricksException, 
RemoteException, ACIDException {
@@ -165,7 +165,7 @@ public class FeedOperations {
         statements.add(dataverseDecl);
         statements.add(subscribeStmt);
         IStatementExecutor translator = 
qtFactory.create(metadataProvider.getApplicationContext(), statements,
-                sessionConfig, compilationProvider, storageComponentProvider);
+                sessionOutput, compilationProvider, storageComponentProvider);
         // configure the metadata provider
         
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + 
Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, 
"" + subscribeStmt.getPolicy());
@@ -357,7 +357,7 @@ public class FeedOperations {
     }
 
     public static Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
-            SessionConfig sessionConfig, MetadataProvider metadataProvider, 
Feed feed,
+            SessionOutput sessionOutput, MetadataProvider metadataProvider, 
Feed feed,
             List<FeedConnection> feedConnections, ILangCompilationProvider 
compilationProvider,
             IStorageComponentProvider storageComponentProvider, 
DefaultStatementExecutorFactory qtFactory,
             IHyracksClientConnection hcc) throws Exception {
@@ -372,7 +372,7 @@ public class FeedOperations {
         String[] ingestionLocations = 
ingestionAdaptorFactory.getPartitionConstraint().getLocations();
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
-            JobSpecification connectionJob = getConnectionJob(sessionConfig, 
metadataProvider, feedConnection,
+            JobSpecification connectionJob = getConnectionJob(sessionOutput, 
metadataProvider, feedConnection,
                     ingestionLocations, compilationProvider, 
storageComponentProvider, qtFactory, hcc);
             jobsList.add(connectionJob);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index e2885b3..5e3da5f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -39,7 +39,7 @@ import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.RunStatement;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -51,7 +51,7 @@ public class QueryTranslatorTest {
     @Test
     public void test() throws Exception {
         List<Statement> statements = new ArrayList<>();
-        SessionConfig mockSessionConfig = mock(SessionConfig.class);
+        SessionOutput mockSessionOutput = mock(SessionOutput.class);
         RunStatement mockRunStatement = mock(RunStatement.class);
 
         // Mocks AppContextInfo.
@@ -70,7 +70,7 @@ public class QueryTranslatorTest {
         when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
 
         IStatementExecutor aqlTranslator = new 
DefaultStatementExecutorFactory().create(mockAsterixAppContextInfo,
-                statements, mockSessionConfig, new AqlCompilationProvider(), 
new StorageComponentProvider());
+                statements, mockSessionOutput, new AqlCompilationProvider(), 
new StorageComponentProvider());
         List<String> parameters = new ArrayList<>();
         parameters.add("examples/pregelix-example-jar-with-dependencies.jar");
         parameters.add("org.apache.pregelix.example.PageRankVertex");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index f1e6aca..9913493 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -32,11 +32,13 @@ import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.Inet4Address;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -118,17 +120,24 @@ public class TestExecutor {
     /*
      * Instance members
      */
-    protected final String host;
-    protected final int port;
+    protected final List<InetSocketAddress> endpoints;
+    protected int endpointSelector;
     protected ITestLibrarian librarian;
 
+    public TestExecutor() {
+        this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
+    }
+
     public TestExecutor(String host, int port) {
-        this.host = host;
-        this.port = port;
+        this(InetSocketAddress.createUnresolved(host, port));
     }
 
-    public TestExecutor() {
-        this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
+    public TestExecutor(InetSocketAddress endpoint) {
+        this(Collections.singletonList(endpoint));
+    }
+
+    public TestExecutor(List<InetSocketAddress> endpoints) {
+        this.endpoints = endpoints;
     }
 
     public void setLibrarian(ITestLibrarian librarian) {
@@ -373,7 +382,7 @@ public class TestExecutor {
                     continue;
                 }
                 throw new Exception(
-                        "Result for " + scriptFile + ": expected pattern '" + 
expression + "' not found in result.");
+                        "Result for " + scriptFile + ": expected pattern '" + 
expression + "' not found in result: "+actual);
             }
         } catch (Exception e) {
             System.err.println("Actual results file: " + 
actualFile.toString());
@@ -811,6 +820,7 @@ public class TestExecutor {
                 break;
             case "mgx":
                 executeManagixCommand(stripLineComments(statement).trim());
+                Thread.sleep(8000);
                 break;
             case "txnqbc": // qbc represents query before crash
                 InputStream resultStream = executeQuery(statement, 
OutputFormat.forCompilationUnit(cUnit),
@@ -1165,7 +1175,7 @@ public class TestExecutor {
 
     protected InputStream executeHttp(String ctxType, String endpoint, 
OutputFormat fmt) throws Exception {
         String[] split = endpoint.split("\\?");
-        URI uri = new URI("http", null, host, port, split[0], split.length > 1 
? split[1] : null, null);
+        URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] : 
null);
         return executeURI(ctxType, uri, fmt);
     }
 
@@ -1184,7 +1194,7 @@ public class TestExecutor {
         //get node process id
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, 
host, port, endpoint, null, null));
+        InputStream executeJSONGet = executeJSONGet(fmt, 
createEndpointURI(endpoint, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1201,7 +1211,7 @@ public class TestExecutor {
     private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws 
Exception {
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://"; + 
host + ":" + port + endpoint));
+        InputStream executeJSONGet = executeJSONGet(fmt, 
createEndpointURI(endpoint, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1280,8 +1290,16 @@ public class TestExecutor {
                         + cUnit.getName() + "_qbc.adm");
     }
 
+    protected URI createEndpointURI(String path, String query) throws 
URISyntaxException {
+        int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+        InetSocketAddress endpoint = endpoints.get(endpointIdx);
+        URI uri = new URI("http", null, endpoint.getHostString(), 
endpoint.getPort(), path, query, null);
+        LOGGER.fine("Created endpoint URI: " + uri);
+        return uri;
+    }
+
     protected URI getEndpoint(String servlet) throws URISyntaxException {
-        return new URI("http", null, host, port, 
getPath(servlet).replaceAll("/\\*$", ""), null, null);
+        return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), 
null);
     }
 
     public static String stripJavaComments(String text) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
new file mode 100644
index 0000000..9a2ad3c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -0,0 +1,52 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-group name="async-deferred">
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-failed">
+            <output-dir compare="Text">async-failed</output-dir>
+            <expected-error>Injected failure in 
asterix:inject-failure</expected-error>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-compilation-failed">
+            <output-dir compare="Text">async-compilation-failed</output-dir>
+            <expected-error>Cannot find dataset gargel</expected-error>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="deferred">
+            <output-dir compare="Text">deferred</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async">
+            <output-dir compare="Text">async</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-repeated">
+            <output-dir compare="Text">async-repeated</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-running">
+            <output-dir compare="Text">async-running</output-dir>
+        </compilation-unit>
+    </test-case>
+</test-group>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 8f7ffc3..d5716d1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -18,43 +18,10 @@
  !-->
 <!DOCTYPE test-suite [
   <!ENTITY RecordsQueries SYSTEM "queries_sqlpp/objects/ObjectsQueries.xml">
-
+  <!ENTITY AsyncDeferredQueries SYSTEM 
"queries_sqlpp/async-deferred/AsyncDeferredQueries.xml">
 ]>
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" 
QueryFileExtension=".sqlpp">
-  <test-group name="async-deferred">
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-failed">
-        <output-dir compare="Text">async-failed</output-dir>
-        <expected-error>Injected failure in 
asterix:inject-failure</expected-error>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-compilation-failed">
-        <output-dir compare="Text">async-compilation-failed</output-dir>
-        <expected-error>Cannot find dataset gargel</expected-error>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="deferred">
-        <output-dir compare="Text">deferred</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async">
-        <output-dir compare="Text">async</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-repeated">
-        <output-dir compare="Text">async-repeated</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-running">
-        <output-dir compare="Text">async-running</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
+  &AsyncDeferredQueries;
   <test-group name="flwor">
     <test-case FilePath="flwor">
       <compilation-unit name="at00">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index a9e6448..e5b78cf 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,6 +20,7 @@ package org.apache.asterix.common.dataflow;
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -72,5 +73,24 @@ public interface ICcApplicationContext extends 
IApplicationContext {
      */
     public IHyracksClientConnection getHcc();
 
+    /**
+     * Returns the resource manager
+     *
+     * @return {@link IResourceIdManager} implementation instance
+     */
     public IResourceIdManager getResourceIdManager();
+
+    /**
+     * Returns the storage component provider
+     *
+     * @return {@link IStorageComponentProvider} implementation instance
+     */
+    public IStorageComponentProvider getStorageComponentProvider();
+
+    /**
+     * Returns the extension manager
+     *
+     * @return the extension manager instance
+     */
+    public Object getExtensionManager();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index e1101b3..86d1074 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -26,7 +26,6 @@ public interface INCMessageBroker extends IMessageBroker {
      * Sends application message from this NC to the CC.
      *
      * @param message
-     * @param callback
      * @throws Exception
      */
     public void sendMessageToCC(ICcAddressedMessage message) throws Exception;
@@ -35,7 +34,6 @@ public interface INCMessageBroker extends IMessageBroker {
      * Sends application message from this NC to another NC.
      *
      * @param message
-     * @param callback
      * @throws Exception
      */
     public void sendMessageToNC(String nodeId, INcAddressedMessage message)
@@ -47,4 +45,17 @@ public interface INCMessageBroker extends IMessageBroker {
      * @param msg
      */
     public void queueReceivedMessage(INcAddressedMessage msg);
+
+    /**
+     * Creates and registers a Future for a message that will be send through 
this broker
+     * @return new Future
+     */
+    MessageFuture registerMessageFuture();
+
+    /**
+     * Removes a previously registered Future
+     * @param futureId future identifier
+     * @return existing Future or {@code null} if there was no Future 
associated with this identifier
+     */
+    MessageFuture deregisterMessageFuture(long futureId);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
new file mode 100644
index 0000000..f4ed1ff
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessage;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link CompletableFuture} associated with an identifier
+ */
+public class MessageFuture extends CompletableFuture<IMessage> {
+
+    private final long futureId;
+
+    public MessageFuture(long futureId) {
+        this.futureId = futureId;
+    }
+
+    public long getFutureId() {
+        return futureId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index d9ca295..e341fef 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -281,6 +281,10 @@
           <groupId>commons-logging</groupId>
           <artifactId>commons-logging</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>xerces</groupId>
+          <artifactId>xercesImpl</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
 
b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
index 3c65a83..814e109 100644
--- 
a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
+++ 
b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
@@ -88,6 +88,7 @@ public class AsterixLifecycleIT {
             
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
             String command = "stop -n " + 
AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
             cmdHandler.processCommand(command.split(" "));
+            Thread.sleep(4000);
             AsterixInstance instance = 
ServiceProvider.INSTANCE.getLookupService()
                     
.getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
             AsterixRuntimeState state = 
VerificationUtil.getAsterixRuntimeState(instance);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 8608d68..a4c271c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -34,6 +34,7 @@ import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -54,6 +55,7 @@ import org.apache.hyracks.storage.common.IStorageManager;
 public class CcApplicationContext implements ICcApplicationContext {
 
     private ICCServiceContext ccServiceCtx;
+    private IStorageComponentProvider storageComponentProvider;
     private IGlobalRecoveryManager globalRecoveryManager;
     private ILibraryManager libraryManager;
     private IResourceIdManager resourceIdManager;
@@ -77,7 +79,8 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     public CcApplicationContext(ICCServiceContext ccServiceCtx, 
IHyracksClientConnection hcc,
             ILibraryManager libraryManager, IResourceIdManager 
resourceIdManager,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
-            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener 
activeLifeCycleListener)
+            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener 
activeLifeCycleListener,
+            IStorageComponentProvider storageComponentProvider)
             throws AsterixException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
@@ -102,6 +105,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         this.nodeProperties = new NodeProperties(propertiesAccessor);
         this.metadataBootstrapSupplier = metadataBootstrapSupplier;
         this.globalRecoveryManager = globalRecoveryManager;
+        this.storageComponentProvider = storageComponentProvider;
     }
 
     @Override
@@ -174,6 +178,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         return libraryManager;
     }
 
+    @Override
     public Object getExtensionManager() {
         return extensionManager;
     }
@@ -213,4 +218,9 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     public IJobLifecycleListener getActiveLifecycleListener() {
         return activeLifeCycleListener;
     }
+
+    @Override
+    public IStorageComponentProvider getStorageComponentProvider() {
+        return storageComponentProvider;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-server/src/main/assembly/filter.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/main/assembly/filter.properties 
b/asterixdb/asterix-server/src/main/assembly/filter.properties
index b01047f..69c5b02 100644
--- a/asterixdb/asterix-server/src/main/assembly/filter.properties
+++ b/asterixdb/asterix-server/src/main/assembly/filter.properties
@@ -21,4 +21,5 @@ CC_COMMAND=asterixcc
 NC_COMMAND=asterixnc
 HELPER_COMMAND=asterixhelper
 LISTEN_PORT=19002
-PRODUCT=AsterixDB
\ No newline at end of file
+PRODUCT=AsterixDB
+NC_BLUE_EXTRA=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf 
b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
index d1f03bc..184728d 100644
--- a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
+++ b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf
@@ -25,6 +25,7 @@ ncservice.port=9091
 txn.log.dir=data/blue/txnlog
 core.dump.dir=data/blue/coredump
 iodevices=data/blue
+${NC_BLUE_EXTRA}
 
 [nc]
 storage.subdir=storage

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
 
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index f34dc6a..ebd945e 100644
--- 
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++ 
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -20,14 +20,29 @@ package org.apache.asterix.testframework.xml;
 
 import java.io.File;
 
+import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import javax.xml.transform.sax.SAXSource;
+
+import org.xml.sax.InputSource;
 
 public class TestSuiteParser {
     public TestSuiteParser() {
     }
 
     public org.apache.asterix.testframework.xml.TestSuite parse(File 
testSuiteCatalog) throws Exception {
+        SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+        saxParserFactory.setNamespaceAware(true);
+        saxParserFactory.setXIncludeAware(true);
+        SAXParser saxParser = saxParserFactory.newSAXParser();
+        saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
+
         JAXBContext ctx = 
JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
-        return (org.apache.asterix.testframework.xml.TestSuite) 
ctx.createUnmarshaller().unmarshal(testSuiteCatalog);
+        Unmarshaller um = ctx.createUnmarshaller();
+        return (org.apache.asterix.testframework.xml.TestSuite) 
um.unmarshal(new SAXSource(saxParser.getXMLReader(),
+                new InputSource(testSuiteCatalog.toURI().toString())));
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml
index 0fed1bb..b06c4b5 100644
--- a/asterixdb/asterix-yarn/pom.xml
+++ b/asterixdb/asterix-yarn/pom.xml
@@ -212,7 +212,6 @@
             
<usedDependency>org.apache.httpcomponents:httpclient</usedDependency>
             <usedDependency>org.apache.httpcomponents:httpcore</usedDependency>
             <usedDependency>org.slf4j:slf4j-simple</usedDependency>
-            <usedDependency>xerces:xercesImpl</usedDependency>
           </usedDependencies>
           <ignoredUnusedDeclaredDependencies>
             
<ignoredUnusedDeclaredDependency>org.apache.asterix:asterix-external-data:zip:*</ignoredUnusedDeclaredDependency>
@@ -388,11 +387,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>xerces</groupId>
-      <artifactId>xercesImpl</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index cdfc492..ca20f4a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -47,6 +47,8 @@ public class HttpServer {
     // Constants
     private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
     private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
+    protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK =
+            new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, 
HIGH_WRITE_BUFFER_WATER_MARK);
     private static final Logger LOGGER = 
Logger.getLogger(HttpServer.class.getName());
     private static final int FAILED = -1;
     private static final int STOPPED = 0;
@@ -192,8 +194,7 @@ public class HttpServer {
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - 
l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
-                        new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, 
HIGH_WRITE_BUFFER_WATER_MARK))
+                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new 
HttpServerInitializer(this));
         channel = b.bind(port).sync().channel();
     }
@@ -225,7 +226,6 @@ public class HttpServer {
                 }
             }
         }
-        LOGGER.warning("No servlet for " + uri);
         return null;
     }
 
@@ -254,7 +254,15 @@ public class HttpServer {
         return b && (path.length() == cpl || '/' == path.charAt(cpl));
     }
 
+    protected HttpServerHandler createHttpHandler(int chunkSize) {
+        return new HttpServerHandler<>(this, chunkSize);
+    }
+
     public ExecutorService getExecutor() {
         return executor;
     }
+
+    protected EventLoopGroup getWorkerGroup() {
+        return workerGroup;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 6f7ccba..00b3cb6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -33,15 +33,16 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
 
-public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
+public class HttpServerHandler<T extends HttpServer> extends 
SimpleChannelInboundHandler<Object> {
 
     private static final Logger LOGGER = 
Logger.getLogger(HttpServerHandler.class.getName());
-    protected final HttpServer server;
+    protected final T server;
     protected final int chunkSize;
     protected HttpRequestHandler handler;
 
-    public HttpServerHandler(HttpServer server, int chunkSize) {
+    public HttpServerHandler(T server, int chunkSize) {
         this.server = server;
         this.chunkSize = chunkSize;
     }
@@ -65,18 +66,18 @@ public class HttpServerHandler extends 
SimpleChannelInboundHandler<Object> {
         try {
             IServlet servlet = server.getServlet(request);
             if (servlet == null) {
-                respond(ctx, request, HttpResponseStatus.NOT_FOUND);
+                handleServletNotFound(ctx, request);
             } else {
                 submit(ctx, servlet, request);
             }
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failure Submitting HTTP Request", e);
-            respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            respond(ctx, request.protocolVersion(), 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
         }
     }
 
-    private void respond(ChannelHandlerContext ctx, FullHttpRequest request, 
HttpResponseStatus status) {
-        DefaultHttpResponse response = new 
DefaultHttpResponse(request.protocolVersion(), status);
+    protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, 
HttpResponseStatus status) {
+        DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, 
status);
         ctx.write(response).addListener(ChannelFutureListener.CLOSE);
     }
 
@@ -86,7 +87,7 @@ public class HttpServerHandler extends 
SimpleChannelInboundHandler<Object> {
             servletRequest = HttpUtil.toServletRequest(request);
         } catch (IllegalArgumentException e) {
             LOGGER.log(Level.WARNING, "Failure Decoding Request", e);
-            respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
+            respond(ctx, request.protocolVersion(), 
HttpResponseStatus.BAD_REQUEST);
             return;
         }
         handler = new HttpRequestHandler(ctx, servlet, servletRequest, 
chunkSize);
@@ -102,6 +103,13 @@ public class HttpServerHandler extends 
SimpleChannelInboundHandler<Object> {
         }
     }
 
+    protected void handleServletNotFound(ChannelHandlerContext ctx, 
FullHttpRequest request) {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("No servlet for " + request.uri());
+        }
+        respond(ctx, request.protocolVersion(), HttpResponseStatus.NOT_FOUND);
+    }
+
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index bc67865..4f8655f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -44,6 +44,6 @@ public class HttpServerInitializer extends 
ChannelInitializer<SocketChannel> {
                 MAX_REQUEST_CHUNK_SIZE));
         p.addLast(new HttpResponseEncoder());
         p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
-        p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE));
+        p.addLast(server.createHttpHandler(RESPONSE_CHUNK_SIZE));
     }
 }

Reply via email to