Repository: asterixdb Updated Branches: refs/heads/master 7a0ba78d3 -> 4671f7127
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java index e2f17ac..00fb1b0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java @@ -28,7 +28,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorFactory; -import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.control.common.utils.HyracksThreadFactory; public class DefaultStatementExecutorFactory implements IStatementExecutorFactory { @@ -48,9 +48,9 @@ public class DefaultStatementExecutorFactory implements IStatementExecutorFactor } @Override - public IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf, + public IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) { - return new QueryTranslator(appCtx, statements, conf, compilationProvider, storageComponentProvider, + return new QueryTranslator(appCtx, statements, output, compilationProvider, storageComponentProvider, executorService); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index d37de0d..e0648ce 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -161,6 +161,7 @@ import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.apache.asterix.translator.TypeTranslator; import org.apache.asterix.translator.util.ValidateUtil; import org.apache.asterix.utils.DataverseUtil; @@ -170,6 +171,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.commons.lang3.tuple.Triple; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -203,6 +205,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen public static final boolean IS_DEBUG_MODE = false;// true protected final List<Statement> statements; protected final ICcApplicationContext appCtx; + protected final SessionOutput sessionOutput; protected final SessionConfig sessionConfig; protected Dataverse activeDataverse; protected final List<FunctionDecl> declaredFunctions; @@ -211,12 +214,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected final IStorageComponentProvider componentProvider; protected final ExecutorService executorService; - public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionConfig conf, + public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output, ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider, ExecutorService executorService) { this.appCtx = appCtx; this.statements = statements; - this.sessionConfig = conf; + this.sessionOutput = output; + this.sessionConfig = output.config(); this.componentProvider = componentProvider; declaredFunctions = getDeclaredFunctions(statements); apiFramework = new APIFramework(compliationProvider); @@ -241,13 +245,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen @Override public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, - Stats stats) throws Exception { - compileAndExecute(hcc, hdc, resultDelivery, stats, null, null); + ResultMetadata outMetadata, Stats stats) throws Exception { + compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null, null); } @Override public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, - Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception { + ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx) + throws Exception { int resultSetIdCounter = 0; FileSplit outputFile = null; IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE; @@ -262,7 +267,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen try { for (Statement stmt : statements) { if (sessionConfig.is(SessionConfig.FORMAT_HTML)) { - sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR); + sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR); } validateOperation(appCtx, activeDataverse, stmt); rewriteStatement(stmt); // Rewrite the statement's AST. @@ -324,8 +329,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); } - handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false, - clientContextId, ctx); + handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, outMetadata, + stats, false, clientContextId, ctx); break; case Statement.Kind.DELETE: handleDeleteStatement(metadataProvider, stmt, hcc, false); @@ -358,8 +363,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); metadataProvider.setResultAsyncMode( resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); - handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats, clientContextId, - ctx); + handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, outMetadata, stats, + clientContextId, ctx); break; case Statement.Kind.COMPACT: handleCompactStatement(metadataProvider, stmt, hcc); @@ -1694,7 +1699,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted()); - JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls); + JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (spec != null) { @@ -1712,8 +1717,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, - IStatementExecutor.Stats stats, boolean compileOnly, String clientContextId, IStatementExecutorContext ctx) - throws Exception { + ResultMetadata outMetadata, Stats stats, boolean compileOnly, String clientContextId, + IStatementExecutorContext ctx) throws Exception { InsertStatement stmtInsertUpsert = (InsertStatement) stmt; String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName()); final IMetadataLocker locker = new IMetadataLocker() { @@ -1755,7 +1760,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } if (stmtInsertUpsert.getReturnExpression() != null) { - deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx); + deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, + clientContextId, ctx); } else { locker.lock(); try { @@ -1811,11 +1817,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Query Rewriting (happens under the same ongoing metadata transaction) Pair<IReturningStatement, Integer> rewrittenResult = - apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionConfig); + apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionOutput); // Query Compilation (happens under the same ongoing metadata transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first, - rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt); + rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt); } private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector, @@ -1824,7 +1830,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Insert/upsert statement rewriting (happens under the same ongoing metadata transaction) Pair<IReturningStatement, Integer> rewrittenResult = - apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionConfig); + apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionOutput); InsertStatement rewrittenInsertUpsert = (InsertStatement) rewrittenResult.first; String dataverseName = getActiveDataverse(rewrittenInsertUpsert.getDataverseName()); @@ -1846,7 +1852,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } // Insert/upsert statement compilation (happens under the same ongoing metadata transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(), - rewrittenResult.second, datasetName, sessionConfig, clfrqs); + rewrittenResult.second, datasetName, sessionOutput, clfrqs); } protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { @@ -2052,7 +2058,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen datasets.add(ds); } org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = - FeedOperations.buildStartFeedJob(sessionConfig, metadataProvider, feed, feedConnections, + FeedOperations.buildStartFeedJob(sessionOutput, metadataProvider, feed, feedConnections, compilationProvider, storageComponentProvider, qtFactory, hcc); JobSpecification feedJob = jobInfo.getLeft(); @@ -2287,8 +2293,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc, - IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, String clientContextId, - IStatementExecutorContext ctx) throws Exception { + IHyracksDataset hdc, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, + String clientContextId, IStatementExecutorContext ctx) throws Exception { final IMetadataLocker locker = new IMetadataLocker() { @Override public void lock() { @@ -2318,12 +2324,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw e; } }; - deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, stats, clientContextId, ctx); + deliverResult(hcc, hdc, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, clientContextId, + ctx); } private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset hdc, IStatementCompiler compiler, - MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, Stats stats, - String clientContextId, IStatementExecutorContext ctx) throws Exception { + MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, + ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx) + throws Exception { final ResultSetId resultSetId = metadataProvider.getResultSetId(); switch (resultDelivery) { case ASYNC: @@ -2339,13 +2347,17 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen case IMMEDIATE: createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(hdc, id, resultSetId); - ResultUtil.printResults(appCtx, resultReader, sessionConfig, stats, + ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); }, clientContextId, ctx); break; case DEFERRED: createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> { - ResultUtil.printResultHandle(sessionConfig, new ResultHandle(id, resultSetId)); + ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId)); + if (outMetadata != null) { + outMetadata.getResultSets() + .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType())); + } }, clientContextId, ctx); break; default: @@ -2360,8 +2372,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen try { createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> { final ResultHandle handle = new ResultHandle(id, resultSetId); - ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.RUNNING); - ResultUtil.printResultHandle(sessionConfig, handle); + ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING); + ResultUtil.printResultHandle(sessionOutput, handle); synchronized (printed) { printed.setTrue(); printed.notify(); @@ -2370,8 +2382,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } catch (Exception e) { if (JobId.INVALID.equals(jobId.getValue())) { // compilation failed - ResultUtil.printStatus(sessionConfig, AbstractQueryApiServlet.ResultStatus.FAILED); - ResultUtil.printError(sessionConfig.out(), e); + ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.FAILED); + ResultUtil.printError(sessionOutput.out(), e); } else { GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, resultDelivery.name() + " job with id " + jobId.getValue() + " " + "failed", e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index bf7d5eb..57cc340 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -19,17 +19,19 @@ package org.apache.asterix.hyracks.bootstrap; +import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL; +import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP; import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.active.ActiveLifecycleListener; +import org.apache.asterix.api.http.ctx.StatementExecutorContext; import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.api.http.server.ClusterApiServlet; import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; @@ -73,6 +75,7 @@ import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.runtime.job.resource.JobCapacityController; import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IServiceContext; @@ -95,6 +98,7 @@ public class CCApplication extends BaseCCApplication { protected ICCServiceContext ccServiceCtx; protected CCExtensionManager ccExtensionManager; protected IStorageComponentProvider componentProvider; + protected StatementExecutorContext statementExecutorCtx; protected WebManager webManager; protected CcApplicationContext appCtx; private IJobCapacityController jobCapacityController; @@ -124,9 +128,10 @@ public class CCApplication extends BaseCCApplication { ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); componentProvider = new StorageComponentProvider(); GlobalRecoveryManager.instantiate(ccServiceCtx, getHcc(), componentProvider); + statementExecutorCtx = new StatementExecutorContext(); appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, resourceIdManager, () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy, - new ActiveLifecycleListener()); + new ActiveLifecycleListener(), componentProvider); ClusterStateManager.INSTANCE.setCcAppCtx(appCtx); ccExtensionManager = new CCExtensionManager(getExtensions()); appCtx.setExtensionManager(ccExtensionManager); @@ -184,7 +189,7 @@ public class CCApplication extends BaseCCApplication { IHyracksClientConnection hcc = getHcc(); webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, appCtx, - ccExtensionManager.getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(), + ccExtensionManager.getCompilationProvider(AQL), ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(), componentProvider)); return webServer; } @@ -197,6 +202,8 @@ public class CCApplication extends BaseCCApplication { jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx); jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR, ccServiceCtx.getControllerService().getExecutor()); + jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, statementExecutorCtx); + jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx); // AQL rest APIs. addServlet(jsonAPIServer, Servlets.AQL_QUERY); @@ -241,28 +248,28 @@ public class CCApplication extends BaseCCApplication { protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) { switch (key) { case Servlets.AQL: - return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(), + return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL), getStatementExecutorFactory(), componentProvider); case Servlets.AQL_QUERY: - return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(), + return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL), getStatementExecutorFactory(), componentProvider); case Servlets.AQL_UPDATE: - return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(), + return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL), getStatementExecutorFactory(), componentProvider); case Servlets.AQL_DDL: - return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getAqlCompilationProvider(), + return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(AQL), getStatementExecutorFactory(), componentProvider); case Servlets.SQLPP: - return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(), + return new FullApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(), componentProvider); case Servlets.SQLPP_QUERY: - return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(), + return new QueryApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(), componentProvider); case Servlets.SQLPP_UPDATE: - return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(), + return new UpdateApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(), componentProvider); case Servlets.SQLPP_DDL: - return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(), + return new DdlApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(), componentProvider); case Servlets.RUNNING_REQUESTS: return new QueryCancellationServlet(ctx, paths); @@ -271,8 +278,9 @@ public class CCApplication extends BaseCCApplication { case Servlets.QUERY_RESULT: return new QueryResultApiServlet(ctx, paths, appCtx); case Servlets.QUERY_SERVICE: - return new QueryServiceServlet(ctx, paths, appCtx, ccExtensionManager.getSqlppCompilationProvider(), - getStatementExecutorFactory(), componentProvider); + return new QueryServiceServlet(ctx, paths, appCtx, SQLPP, + ccExtensionManager.getCompilationProvider(SQLPP), getStatementExecutorFactory(), + componentProvider); case Servlets.CONNECTOR: return new ConnectorApiServlet(ctx, paths, appCtx); case Servlets.SHUTDOWN: @@ -292,13 +300,17 @@ public class CCApplication extends BaseCCApplication { } } - private IStatementExecutorFactory getStatementExecutorFactory() { + public IStatementExecutorFactory getStatementExecutorFactory() { return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor()); } + public IStatementExecutorContext getStatementExecutorContext() { + return statementExecutorCtx; + } + @Override public void startupCompleted() throws Exception { - ccServiceCtx.getControllerService().getExecutor().submit((Callable) () -> { + ccServiceCtx.getControllerService().getExecutor().submit(() -> { ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE); ClusterManagerProvider.getClusterManager().notifyStartupCompleted(); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 9c24acf..ec01e1c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -49,21 +49,26 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourc import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.client.ClusterControllerInfo; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.job.resource.NodeCapacity; import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.BaseNCApplication; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.http.server.WebManager; public class NCApplication extends BaseNCApplication { private static final Logger LOGGER = Logger.getLogger(NCApplication.class.getName()); - private INCServiceContext ncServiceCtx; + protected INCServiceContext ncServiceCtx; private INcApplicationContext runtimeContext; private String nodeId; private boolean stopInitiated = false; private SystemState systemState; + protected WebManager webManager; @Override public void registerConfig(IConfigManager configManager) { @@ -122,6 +127,8 @@ public class NCApplication extends BaseNCApplication { localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName()); } + webManager = new WebManager(); + performLocalCleanUp(); } @@ -131,6 +138,10 @@ public class NCApplication extends BaseNCApplication { Logger.getLogger("org.apache.asterix").setLevel(level); } + protected void configureServers() throws Exception { + // override to start web services on NC nodes + } + protected List<AsterixExtension> getExtensions() { return Collections.emptyList(); } @@ -143,6 +154,9 @@ public class NCApplication extends BaseNCApplication { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Stopping Asterix node controller: " + nodeId); } + + webManager.stop(); + //Clean any temporary files performLocalCleanUp(); @@ -163,6 +177,10 @@ public class NCApplication extends BaseNCApplication { @Override public void startupCompleted() throws Exception { + // configure servlets after joining the cluster, so we can create HyracksClientConnection + configureServers(); + webManager.start(); + // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag final NodeProperties nodeProperties = runtimeContext.getNodeProperties(); if (systemState == SystemState.PERMANENT_DATA_LOSS @@ -262,4 +280,10 @@ public class NCApplication extends BaseNCApplication { public INcApplicationContext getApplicationContext() { return runtimeContext; } + + protected IHyracksClientConnection getHcc() throws Exception { + NodeControllerService ncSrv = (NodeControllerService) ncServiceCtx.getControllerService(); + ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo(); + return new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort()); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java index 630aabe..33e89f0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java @@ -21,6 +21,7 @@ package org.apache.asterix.messaging; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,12 +31,16 @@ import org.apache.asterix.common.memory.ConcurrentFramePool; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.nc.NodeControllerService; +import io.netty.util.collection.LongObjectHashMap; +import io.netty.util.collection.LongObjectMap; + public class NCMessageBroker implements INCMessageBroker { private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName()); @@ -44,6 +49,8 @@ public class NCMessageBroker implements INCMessageBroker { private final LinkedBlockingQueue<INcAddressedMessage> receivedMsgsQ; private final ConcurrentFramePool messagingFramePool; private final int maxMsgSize; + private final AtomicLong futureIdGenerator; + private final LongObjectMap<MessageFuture> futureMap; public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) { this.ncs = ncs; @@ -53,6 +60,8 @@ public class NCMessageBroker implements INCMessageBroker { messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget, messagingProperties.getFrameSize()); receivedMsgsQ = new LinkedBlockingQueue<>(); + futureIdGenerator = new AtomicLong(); + futureMap = new LongObjectHashMap<>(); MessageDeliveryService msgDeliverySvc = new MessageDeliveryService(); appContext.getThreadExecutor().execute(msgDeliverySvc); } @@ -104,6 +113,26 @@ public class NCMessageBroker implements INCMessageBroker { ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer); } + @Override + public MessageFuture registerMessageFuture() { + long futureId = futureIdGenerator.incrementAndGet(); + MessageFuture future = new MessageFuture(futureId); + synchronized (futureMap) { + if (futureMap.containsKey(futureId)) { + throw new IllegalStateException(); + } + futureMap.put(futureId, future); + } + return future; + } + + @Override + public MessageFuture deregisterMessageFuture(long futureId) { + synchronized (futureMap) { + return futureMap.remove(futureId); + } + } + private class MessageDeliveryService implements Runnable { /* * TODO Currently this thread is not stopped when it is interrupted because http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index bffaeef..d1ff871 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -69,7 +69,7 @@ import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.asterix.translator.CompiledStatements; import org.apache.asterix.translator.IStatementExecutor; -import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -151,7 +151,7 @@ public class FeedOperations { return spec; } - private static JobSpecification getConnectionJob(SessionConfig sessionConfig, MetadataProvider metadataProvider, + private static JobSpecification getConnectionJob(SessionOutput sessionOutput, MetadataProvider metadataProvider, FeedConnection feedConnection, String[] locations, ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory, IHyracksClientConnection hcc) throws AlgebricksException, RemoteException, ACIDException { @@ -165,7 +165,7 @@ public class FeedOperations { statements.add(dataverseDecl); statements.add(subscribeStmt); IStatementExecutor translator = qtFactory.create(metadataProvider.getApplicationContext(), statements, - sessionConfig, compilationProvider, storageComponentProvider); + sessionOutput, compilationProvider, storageComponentProvider); // configure the metadata provider metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE); metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + subscribeStmt.getPolicy()); @@ -357,7 +357,7 @@ public class FeedOperations { } public static Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> buildStartFeedJob( - SessionConfig sessionConfig, MetadataProvider metadataProvider, Feed feed, + SessionOutput sessionOutput, MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections, ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider, DefaultStatementExecutorFactory qtFactory, IHyracksClientConnection hcc) throws Exception { @@ -372,7 +372,7 @@ public class FeedOperations { String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations(); // Add connection job for (FeedConnection feedConnection : feedConnections) { - JobSpecification connectionJob = getConnectionJob(sessionConfig, metadataProvider, feedConnection, + JobSpecification connectionJob = getConnectionJob(sessionOutput, metadataProvider, feedConnection, ingestionLocations, compilationProvider, storageComponentProvider, qtFactory, hcc); jobsList.add(connectionJob); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java index e2885b3..5e3da5f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java @@ -39,7 +39,7 @@ import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.statement.RunStatement; import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.translator.IStatementExecutor; -import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.junit.Assert; import org.junit.Test; @@ -51,7 +51,7 @@ public class QueryTranslatorTest { @Test public void test() throws Exception { List<Statement> statements = new ArrayList<>(); - SessionConfig mockSessionConfig = mock(SessionConfig.class); + SessionOutput mockSessionOutput = mock(SessionOutput.class); RunStatement mockRunStatement = mock(RunStatement.class); // Mocks AppContextInfo. @@ -70,7 +70,7 @@ public class QueryTranslatorTest { when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1"); IStatementExecutor aqlTranslator = new DefaultStatementExecutorFactory().create(mockAsterixAppContextInfo, - statements, mockSessionConfig, new AqlCompilationProvider(), new StorageComponentProvider()); + statements, mockSessionOutput, new AqlCompilationProvider(), new StorageComponentProvider()); List<String> parameters = new ArrayList<>(); parameters.add("examples/pregelix-example-jar-with-dependencies.jar"); parameters.add("org.apache.pregelix.example.PageRankVertex"); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index f1e6aca..9913493 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -32,11 +32,13 @@ import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Inet4Address; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -118,17 +120,24 @@ public class TestExecutor { /* * Instance members */ - protected final String host; - protected final int port; + protected final List<InetSocketAddress> endpoints; + protected int endpointSelector; protected ITestLibrarian librarian; + public TestExecutor() { + this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002); + } + public TestExecutor(String host, int port) { - this.host = host; - this.port = port; + this(InetSocketAddress.createUnresolved(host, port)); } - public TestExecutor() { - this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002); + public TestExecutor(InetSocketAddress endpoint) { + this(Collections.singletonList(endpoint)); + } + + public TestExecutor(List<InetSocketAddress> endpoints) { + this.endpoints = endpoints; } public void setLibrarian(ITestLibrarian librarian) { @@ -373,7 +382,7 @@ public class TestExecutor { continue; } throw new Exception( - "Result for " + scriptFile + ": expected pattern '" + expression + "' not found in result."); + "Result for " + scriptFile + ": expected pattern '" + expression + "' not found in result: "+actual); } } catch (Exception e) { System.err.println("Actual results file: " + actualFile.toString()); @@ -811,6 +820,7 @@ public class TestExecutor { break; case "mgx": executeManagixCommand(stripLineComments(statement).trim()); + Thread.sleep(8000); break; case "txnqbc": // qbc represents query before crash InputStream resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit), @@ -1165,7 +1175,7 @@ public class TestExecutor { protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt) throws Exception { String[] split = endpoint.split("\\?"); - URI uri = new URI("http", null, host, port, split[0], split.length > 1 ? split[1] : null, null); + URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] : null); return executeURI(ctxType, uri, fmt); } @@ -1184,7 +1194,7 @@ public class TestExecutor { //get node process id OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit); String endpoint = "/admin/cluster/node/" + nodeId + "/config"; - InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, host, port, endpoint, null, null)); + InputStream executeJSONGet = executeJSONGet(fmt, createEndpointURI(endpoint, null)); StringWriter actual = new StringWriter(); IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8); String config = actual.toString(); @@ -1201,7 +1211,7 @@ public class TestExecutor { private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws Exception { OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit); String endpoint = "/admin/cluster/node/" + nodeId + "/config"; - InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://" + host + ":" + port + endpoint)); + InputStream executeJSONGet = executeJSONGet(fmt, createEndpointURI(endpoint, null)); StringWriter actual = new StringWriter(); IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8); String config = actual.toString(); @@ -1280,8 +1290,16 @@ public class TestExecutor { + cUnit.getName() + "_qbc.adm"); } + protected URI createEndpointURI(String path, String query) throws URISyntaxException { + int endpointIdx = Math.abs(endpointSelector++ % endpoints.size()); + InetSocketAddress endpoint = endpoints.get(endpointIdx); + URI uri = new URI("http", null, endpoint.getHostString(), endpoint.getPort(), path, query, null); + LOGGER.fine("Created endpoint URI: " + uri); + return uri; + } + protected URI getEndpoint(String servlet) throws URISyntaxException { - return new URI("http", null, host, port, getPath(servlet).replaceAll("/\\*$", ""), null, null); + return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), null); } public static String stripJavaComments(String text) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml new file mode 100644 index 0000000..9a2ad3c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml @@ -0,0 +1,52 @@ +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-group name="async-deferred"> + <test-case FilePath="async-deferred"> + <compilation-unit name="async-failed"> + <output-dir compare="Text">async-failed</output-dir> + <expected-error>Injected failure in asterix:inject-failure</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="async-deferred"> + <compilation-unit name="async-compilation-failed"> + <output-dir compare="Text">async-compilation-failed</output-dir> + <expected-error>Cannot find dataset gargel</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="async-deferred"> + <compilation-unit name="deferred"> + <output-dir compare="Text">deferred</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="async-deferred"> + <compilation-unit name="async"> + <output-dir compare="Text">async</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="async-deferred"> + <compilation-unit name="async-repeated"> + <output-dir compare="Text">async-repeated</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="async-deferred"> + <compilation-unit name="async-running"> + <output-dir compare="Text">async-running</output-dir> + </compilation-unit> + </test-case> +</test-group> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 8f7ffc3..d5716d1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -18,43 +18,10 @@ !--> <!DOCTYPE test-suite [ <!ENTITY RecordsQueries SYSTEM "queries_sqlpp/objects/ObjectsQueries.xml"> - + <!ENTITY AsyncDeferredQueries SYSTEM "queries_sqlpp/async-deferred/AsyncDeferredQueries.xml"> ]> <test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp"> - <test-group name="async-deferred"> - <test-case FilePath="async-deferred"> - <compilation-unit name="async-failed"> - <output-dir compare="Text">async-failed</output-dir> - <expected-error>Injected failure in asterix:inject-failure</expected-error> - </compilation-unit> - </test-case> - <test-case FilePath="async-deferred"> - <compilation-unit name="async-compilation-failed"> - <output-dir compare="Text">async-compilation-failed</output-dir> - <expected-error>Cannot find dataset gargel</expected-error> - </compilation-unit> - </test-case> - <test-case FilePath="async-deferred"> - <compilation-unit name="deferred"> - <output-dir compare="Text">deferred</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="async-deferred"> - <compilation-unit name="async"> - <output-dir compare="Text">async</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="async-deferred"> - <compilation-unit name="async-repeated"> - <output-dir compare="Text">async-repeated</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="async-deferred"> - <compilation-unit name="async-running"> - <output-dir compare="Text">async-running</output-dir> - </compilation-unit> - </test-case> - </test-group> + &AsyncDeferredQueries; <test-group name="flwor"> <test-case FilePath="flwor"> <compilation-unit name="at00"> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index a9e6448..e5b78cf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -20,6 +20,7 @@ package org.apache.asterix.common.dataflow; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -72,5 +73,24 @@ public interface ICcApplicationContext extends IApplicationContext { */ public IHyracksClientConnection getHcc(); + /** + * Returns the resource manager + * + * @return {@link IResourceIdManager} implementation instance + */ public IResourceIdManager getResourceIdManager(); + + /** + * Returns the storage component provider + * + * @return {@link IStorageComponentProvider} implementation instance + */ + public IStorageComponentProvider getStorageComponentProvider(); + + /** + * Returns the extension manager + * + * @return the extension manager instance + */ + public Object getExtensionManager(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java index e1101b3..86d1074 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java @@ -26,7 +26,6 @@ public interface INCMessageBroker extends IMessageBroker { * Sends application message from this NC to the CC. * * @param message - * @param callback * @throws Exception */ public void sendMessageToCC(ICcAddressedMessage message) throws Exception; @@ -35,7 +34,6 @@ public interface INCMessageBroker extends IMessageBroker { * Sends application message from this NC to another NC. * * @param message - * @param callback * @throws Exception */ public void sendMessageToNC(String nodeId, INcAddressedMessage message) @@ -47,4 +45,17 @@ public interface INCMessageBroker extends IMessageBroker { * @param msg */ public void queueReceivedMessage(INcAddressedMessage msg); + + /** + * Creates and registers a Future for a message that will be send through this broker + * @return new Future + */ + MessageFuture registerMessageFuture(); + + /** + * Removes a previously registered Future + * @param futureId future identifier + * @return existing Future or {@code null} if there was no Future associated with this identifier + */ + MessageFuture deregisterMessageFuture(long futureId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java new file mode 100644 index 0000000..f4ed1ff --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/MessageFuture.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.common.messaging.api; + +import org.apache.hyracks.api.messages.IMessage; + +import java.util.concurrent.CompletableFuture; + +/** + * A {@link CompletableFuture} associated with an identifier + */ +public class MessageFuture extends CompletableFuture<IMessage> { + + private final long futureId; + + public MessageFuture(long futureId) { + this.futureId = futureId; + } + + public long getFutureId() { + return futureId; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-external-data/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index d9ca295..e341fef 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -281,6 +281,10 @@ <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> </exclusion> + <exclusion> + <groupId>xerces</groupId> + <artifactId>xercesImpl</artifactId> + </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java index 3c65a83..814e109 100644 --- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java +++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java @@ -88,6 +88,7 @@ public class AsterixLifecycleIT { AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE); String command = "stop -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME; cmdHandler.processCommand(command.split(" ")); + Thread.sleep(4000); AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService() .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME); AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 8608d68..a4c271c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -34,6 +34,7 @@ import org.apache.asterix.common.config.PropertiesAccessor; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; @@ -54,6 +55,7 @@ import org.apache.hyracks.storage.common.IStorageManager; public class CcApplicationContext implements ICcApplicationContext { private ICCServiceContext ccServiceCtx; + private IStorageComponentProvider storageComponentProvider; private IGlobalRecoveryManager globalRecoveryManager; private ILibraryManager libraryManager; private IResourceIdManager resourceIdManager; @@ -77,7 +79,8 @@ public class CcApplicationContext implements ICcApplicationContext { public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, IResourceIdManager resourceIdManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, - IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener) + IFaultToleranceStrategy ftStrategy, IJobLifecycleListener activeLifeCycleListener, + IStorageComponentProvider storageComponentProvider) throws AsterixException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; @@ -102,6 +105,7 @@ public class CcApplicationContext implements ICcApplicationContext { this.nodeProperties = new NodeProperties(propertiesAccessor); this.metadataBootstrapSupplier = metadataBootstrapSupplier; this.globalRecoveryManager = globalRecoveryManager; + this.storageComponentProvider = storageComponentProvider; } @Override @@ -174,6 +178,7 @@ public class CcApplicationContext implements ICcApplicationContext { return libraryManager; } + @Override public Object getExtensionManager() { return extensionManager; } @@ -213,4 +218,9 @@ public class CcApplicationContext implements ICcApplicationContext { public IJobLifecycleListener getActiveLifecycleListener() { return activeLifeCycleListener; } + + @Override + public IStorageComponentProvider getStorageComponentProvider() { + return storageComponentProvider; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-server/src/main/assembly/filter.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/main/assembly/filter.properties b/asterixdb/asterix-server/src/main/assembly/filter.properties index b01047f..69c5b02 100644 --- a/asterixdb/asterix-server/src/main/assembly/filter.properties +++ b/asterixdb/asterix-server/src/main/assembly/filter.properties @@ -21,4 +21,5 @@ CC_COMMAND=asterixcc NC_COMMAND=asterixnc HELPER_COMMAND=asterixhelper LISTEN_PORT=19002 -PRODUCT=AsterixDB \ No newline at end of file +PRODUCT=AsterixDB +NC_BLUE_EXTRA= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf index d1f03bc..184728d 100644 --- a/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf +++ b/asterixdb/asterix-server/src/main/opt/local/conf/cc.conf @@ -25,6 +25,7 @@ ncservice.port=9091 txn.log.dir=data/blue/txnlog core.dump.dir=data/blue/coredump iodevices=data/blue +${NC_BLUE_EXTRA} [nc] storage.subdir=storage http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java index f34dc6a..ebd945e 100644 --- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java +++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java @@ -20,14 +20,29 @@ package org.apache.asterix.testframework.xml; import java.io.File; +import javax.xml.XMLConstants; import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; +import javax.xml.parsers.SAXParser; +import javax.xml.parsers.SAXParserFactory; +import javax.xml.transform.sax.SAXSource; + +import org.xml.sax.InputSource; public class TestSuiteParser { public TestSuiteParser() { } public org.apache.asterix.testframework.xml.TestSuite parse(File testSuiteCatalog) throws Exception { + SAXParserFactory saxParserFactory = SAXParserFactory.newInstance(); + saxParserFactory.setNamespaceAware(true); + saxParserFactory.setXIncludeAware(true); + SAXParser saxParser = saxParserFactory.newSAXParser(); + saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file"); + JAXBContext ctx = JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class); - return (org.apache.asterix.testframework.xml.TestSuite) ctx.createUnmarshaller().unmarshal(testSuiteCatalog); + Unmarshaller um = ctx.createUnmarshaller(); + return (org.apache.asterix.testframework.xml.TestSuite) um.unmarshal(new SAXSource(saxParser.getXMLReader(), + new InputSource(testSuiteCatalog.toURI().toString()))); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-yarn/pom.xml b/asterixdb/asterix-yarn/pom.xml index 0fed1bb..b06c4b5 100644 --- a/asterixdb/asterix-yarn/pom.xml +++ b/asterixdb/asterix-yarn/pom.xml @@ -212,7 +212,6 @@ <usedDependency>org.apache.httpcomponents:httpclient</usedDependency> <usedDependency>org.apache.httpcomponents:httpcore</usedDependency> <usedDependency>org.slf4j:slf4j-simple</usedDependency> - <usedDependency>xerces:xercesImpl</usedDependency> </usedDependencies> <ignoredUnusedDeclaredDependencies> <ignoredUnusedDeclaredDependency>org.apache.asterix:asterix-external-data:zip:*</ignoredUnusedDeclaredDependency> @@ -388,11 +387,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>xerces</groupId> - <artifactId>xercesImpl</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index cdfc492..ca20f4a 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -47,6 +47,8 @@ public class HttpServer { // Constants private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024; private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024; + protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK = + new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK); private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName()); private static final int FAILED = -1; private static final int STOPPED = 0; @@ -192,8 +194,7 @@ public class HttpServer { Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length()); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) - .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, - new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK)) + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK) .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this)); channel = b.bind(port).sync().channel(); } @@ -225,7 +226,6 @@ public class HttpServer { } } } - LOGGER.warning("No servlet for " + uri); return null; } @@ -254,7 +254,15 @@ public class HttpServer { return b && (path.length() == cpl || '/' == path.charAt(cpl)); } + protected HttpServerHandler createHttpHandler(int chunkSize) { + return new HttpServerHandler<>(this, chunkSize); + } + public ExecutorService getExecutor() { return executor; } + + protected EventLoopGroup getWorkerGroup() { + return workerGroup; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java index 6f7ccba..00b3cb6 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java @@ -33,15 +33,16 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; -public class HttpServerHandler extends SimpleChannelInboundHandler<Object> { +public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object> { private static final Logger LOGGER = Logger.getLogger(HttpServerHandler.class.getName()); - protected final HttpServer server; + protected final T server; protected final int chunkSize; protected HttpRequestHandler handler; - public HttpServerHandler(HttpServer server, int chunkSize) { + public HttpServerHandler(T server, int chunkSize) { this.server = server; this.chunkSize = chunkSize; } @@ -65,18 +66,18 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<Object> { try { IServlet servlet = server.getServlet(request); if (servlet == null) { - respond(ctx, request, HttpResponseStatus.NOT_FOUND); + handleServletNotFound(ctx, request); } else { submit(ctx, servlet, request); } } catch (Exception e) { LOGGER.log(Level.SEVERE, "Failure Submitting HTTP Request", e); - respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR); + respond(ctx, request.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR); } } - private void respond(ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) { - DefaultHttpResponse response = new DefaultHttpResponse(request.protocolVersion(), status); + protected void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, HttpResponseStatus status) { + DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, status); ctx.write(response).addListener(ChannelFutureListener.CLOSE); } @@ -86,7 +87,7 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<Object> { servletRequest = HttpUtil.toServletRequest(request); } catch (IllegalArgumentException e) { LOGGER.log(Level.WARNING, "Failure Decoding Request", e); - respond(ctx, request, HttpResponseStatus.BAD_REQUEST); + respond(ctx, request.protocolVersion(), HttpResponseStatus.BAD_REQUEST); return; } handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize); @@ -102,6 +103,13 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<Object> { } } + protected void handleServletNotFound(ChannelHandlerContext ctx, FullHttpRequest request) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("No servlet for " + request.uri()); + } + respond(ctx, request.protocolVersion(), HttpResponseStatus.NOT_FOUND); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java index bc67865..4f8655f 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java @@ -44,6 +44,6 @@ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { MAX_REQUEST_CHUNK_SIZE)); p.addLast(new HttpResponseEncoder()); p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); - p.addLast(new HttpServerHandler(server, RESPONSE_CHUNK_SIZE)); + p.addLast(server.createHttpHandler(RESPONSE_CHUNK_SIZE)); } }