abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1961
Change subject: [ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup ...................................................................... [ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5 --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java M hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java M hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java M hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java M hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java M hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java M hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java M hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java M hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java M hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java 53 files changed, 552 insertions(+), 308 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/61/1961/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java index cfd4e87..b09d139 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java @@ -111,18 +111,20 @@ private final boolean executeQuery; private final boolean generateJobSpec; private final boolean optimize; + private final long timeout; // Flags. private final Map<String, Boolean> flags; public SessionConfig(OutputFormat fmt) { - this(fmt, true, true, true); + this(fmt, true, true, true, Long.MAX_VALUE); } /** * Create a SessionConfig object with all optional values set to defaults: * - All format flags set to "false". * - All out-of-band outputs set to "false". + * * @param fmt * Output format for execution output. * @param optimize @@ -132,12 +134,14 @@ * @param generateJobSpec * Whether to generate the Hyracks job specification (if */ - public SessionConfig(OutputFormat fmt, boolean optimize, boolean executeQuery, boolean generateJobSpec) { + public SessionConfig(OutputFormat fmt, boolean optimize, boolean executeQuery, boolean generateJobSpec, + long timeout) { this.fmt = fmt; this.optimize = optimize; this.executeQuery = executeQuery; this.generateJobSpec = generateJobSpec; this.flags = new HashMap<>(); + this.timeout = timeout; } /** @@ -203,4 +207,8 @@ Boolean value = flags.get(flag); return value == null ? false : value.booleanValue(); } + + public long getTimeout() { + return timeout; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 583302b..f31e993 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -356,13 +356,13 @@ return spec; } - public void executeJobArray(IHyracksClientConnection hcc, JobSpecification[] specs, PrintWriter out) + public void executeJobArray(IHyracksClientConnection hcc, JobSpecification[] specs, PrintWriter out, long timeout) throws Exception { for (JobSpecification spec : specs) { spec.setMaxReattempts(0); JobId jobId = hcc.startJob(spec); long startTime = System.currentTimeMillis(); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, timeout); long endTime = System.currentTimeMillis(); double duration = (endTime - startTime) / 1000.00; out.println("<pre>Duration: " + duration + " sec</pre>"); @@ -370,7 +370,8 @@ } - public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception { + public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, long timeout) + throws Exception { for (Job job : jobs) { job.getJobSpec().setMaxReattempts(0); long startTime = System.currentTimeMillis(); @@ -379,7 +380,7 @@ if (job.getSubmissionMode() == SubmissionMode.ASYNCHRONOUS) { continue; } - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, timeout); } catch (Exception e) { e.printStackTrace(); continue; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index 7874aa3..e7b7435 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -138,7 +138,7 @@ } IParser parser = parserFactory.createParser(query); List<Statement> aqlStatements = parser.parse(); - SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true); + SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true, Long.MAX_VALUE); sessionConfig.set(SessionConfig.FORMAT_HTML, true); sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader); sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, isSet(wrapperArray)); @@ -150,8 +150,8 @@ compilationProvider, componentProvider); double duration; long startTime = System.currentTimeMillis(); - translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE, - null, new IStatementExecutor.Stats()); + translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE, null, + new IStatementExecutor.Stats()); long endTime = System.currentTimeMillis(); duration = (endTime - startTime) / 1000.00; out.println(HTML_STATEMENT_SEPARATOR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 1f1d282..e39b63f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -121,7 +121,7 @@ } } - private enum Attribute { + protected enum Attribute { HEADER("header"), LOSSLESS("lossless"); @@ -182,14 +182,14 @@ } } - static class RequestParameters { - String host; - String path; - String statement; - String format; - boolean pretty; - String clientContextID; - String mode; + protected static class RequestParameters { + public String host; + public String path; + public String statement; + public String format; + public boolean pretty; + public String clientContextID; + public String mode; @Override public String toString() { @@ -209,7 +209,7 @@ } } - private static String getParameterValue(String content, String attribute) { + protected static String getParameterValue(String content, String attribute) { if (content == null || attribute == null) { return null; } @@ -231,7 +231,7 @@ return s != null ? s.toLowerCase() : s; } - private static SessionConfig.OutputFormat getFormat(String format) { + protected static SessionConfig.OutputFormat getFormat(String format) { if (format != null) { if (format.startsWith(HttpUtil.ContentType.CSV)) { return SessionConfig.OutputFormat.CSV; @@ -247,8 +247,7 @@ return SessionConfig.OutputFormat.CLEAN_JSON; } - private static SessionOutput createSessionOutput(RequestParameters param, String handleUrl, - PrintWriter resultWriter) { + protected SessionOutput createSessionOutput(RequestParameters param, String handleUrl, PrintWriter resultWriter) { SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java similarity index 62% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java index 9547514..4f4221e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/NCQueryServiceServlet.java @@ -17,14 +17,18 @@ * under the License. */ -package org.apache.asterix.api.http.server; +package org.apache.asterix.api.http.servlet; +import java.io.PrintWriter; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import java.util.logging.Level; -import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.api.http.server.QueryServiceServlet; +import org.apache.asterix.api.http.server.ResultUtil; +import org.apache.asterix.app.message.CancelQueryRequest; import org.apache.asterix.app.message.ExecuteStatementRequestMessage; import org.apache.asterix.app.message.ExecuteStatementResponseMessage; import org.apache.asterix.app.result.ResultReader; @@ -34,6 +38,7 @@ import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; import org.apache.commons.lang3.tuple.Triple; import org.apache.hyracks.api.application.INCServiceContext; @@ -41,11 +46,14 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.ipc.exceptions.IPCException; +import io.netty.handler.codec.http.HttpResponseStatus; + /** * Query service servlet that can run on NC nodes. * Delegates query execution to CC, then serves the result. */ public class NCQueryServiceServlet extends QueryServiceServlet { + public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, ILangExtension.Language queryLanguage) { super(ctx, paths, appCtx, queryLanguage, null, null, null); @@ -63,13 +71,23 @@ ExecuteStatementResponseMessage responseMsg; MessageFuture responseFuture = ncMb.registerMessageFuture(); try { + if (param.clientContextID == null) { + param.clientContextID = UUID.randomUUID().toString(); + } ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl); outExecStartEnd[0] = System.nanoTime(); ncMb.sendMessageToCC(requestMsg); - responseMsg = (ExecuteStatementResponseMessage) responseFuture.get( - ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS); + try { + responseMsg = (ExecuteStatementResponseMessage) responseFuture.get( + ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS, + java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (TimeoutException exception) { + // cancel query + cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, exception); + throw exception; + } outExecStartEnd[1] = System.nanoTime(); } finally { ncMb.deregisterMessageFuture(responseFuture.getFutureId()); @@ -97,6 +115,39 @@ } } + private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID, + TimeoutException exception) { + MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture(); + CancelQueryRequest cancelQueryMessage = + new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID); + try { + messageBroker.sendMessageToCC(cancelQueryMessage); + cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS, + java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (Exception e) { + exception.addSuppressed(e); + } + } + + @Override + protected SessionOutput createSessionOutput(RequestParameters param, String handleUrl, PrintWriter resultWriter) { + SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); + SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); + SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); + SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); + + SessionConfig.OutputFormat format = QueryServiceServlet.getFormat(param.format); + SessionConfig sessionConfig = + new SessionConfig(format, true, true, true, ExecuteStatementRequestMessage.DEFAULT_JOB_TIMEOUT_MILLIS); + sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true); + sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty); + sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD, + format != SessionConfig.OutputFormat.CLEAN_JSON && format != SessionConfig.OutputFormat.LOSSLESS_JSON); + sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == SessionConfig.OutputFormat.CSV + && "present".equals(QueryServiceServlet.getParameterValue(param.format, Attribute.HEADER.str()))); + return new SessionOutput(sessionConfig, resultWriter, resultPrefix, resultPostfix, appendHandle, appendStatus); + } + @Override protected HttpResponseStatus handleExecuteStatementException(Throwable t) { if (t instanceof IPCException || t instanceof TimeoutException) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java index a9d24b9..d6bbb6e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java @@ -100,7 +100,7 @@ List<Statement> statements = parser.parse(); MetadataManager.INSTANCE.init(); - SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, true, generateBinaryRuntime); + SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, true, generateBinaryRuntime, Long.MAX_VALUE); conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan, printOptimizedPlan, printJob); if (printPhysicalOpsOnly) { conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true); @@ -109,17 +109,17 @@ IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider, storageComponentProvider); - translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE, - null, new IStatementExecutor.Stats()); + translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE, null, + new IStatementExecutor.Stats()); writer.flush(); } public void execute() throws Exception { if (dmlJobs != null) { - apiFramework.executeJobArray(hcc, dmlJobs, writer); + apiFramework.executeJobArray(hcc, dmlJobs, writer, Long.MAX_VALUE); } if (queryJobSpec != null) { - apiFramework.executeJobArray(hcc, new JobSpecification[] { queryJobSpec }, writer); + apiFramework.executeJobArray(hcc, new JobSpecification[] { queryJobSpec }, writer, Long.MAX_VALUE); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index 124e56e..e01b1c5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -97,7 +97,7 @@ // We will need to design general exception handling mechanism for feeds. setLocations(jobInfo.getRight()); boolean wait = Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)); - JobUtils.runJob(hcc, feedJob, false); + JobUtils.runJob(hcc, feedJob); eventSubscriber.sync(); if (wait) { IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java new file mode 100644 index 0000000..fb6ec37 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.hyracks.bootstrap.CCApplication; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class CancelQueryRequest implements ICcAddressedMessage { + + private static final Logger LOGGER = Logger.getLogger(CancelQueryRequest.class.getName()); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final long reqId; + private final String contextId; + + public CancelQueryRequest(String nodeId, long reqId, String contextId) { + this.nodeId = nodeId; + this.reqId = reqId; + this.contextId = contextId; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); + CCApplication application = (CCApplication) ccs.getApplication(); + IStatementExecutorContext executorsCtx = application.getStatementExecutorContext(); + JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId); + + if (jobId == null) { + LOGGER.log(Level.WARN, "No job found for context id " + contextId); + } else { + try { + IHyracksClientConnection hcc = application.getHcc(); + hcc.cancelJob(jobId); + executorsCtx.removeJobIdFromClientContextId(contextId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); + } + } + CancelQueryResponse response = new CancelQueryResponse(reqId); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure sending response to nc", e); + } + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java new file mode 100644 index 0000000..4fbcf22 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class CancelQueryResponse implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + + public CancelQueryResponse(long reqId) { + this.reqId = reqId; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index e7919fa..e53cc83 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -22,6 +22,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,23 +55,17 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage { private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName()); - + public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(6); + public static final long DEFAULT_JOB_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); + public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); private final String requestNodeId; - private final long requestMessageId; - private final ILangExtension.Language lang; - private final String statementsText; - private final SessionConfig sessionConfig; - private final IStatementExecutor.ResultDelivery delivery; - private final String clientContextID; - private final String handleUrl; public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, @@ -102,47 +97,41 @@ IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider(); IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory(); IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext(); - - ccSrv.getExecutor().submit(() -> { - ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); - try { - IParser parser = compilationProvider.getParserFactory().createParser(statementsText); - List<Statement> statements = parser.parse(); - StringWriter outWriter = new StringWriter(256); - PrintWriter outPrinter = new PrintWriter(outWriter); - SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); - SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); - SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); - SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); - SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix, - appendHandle, appendStatus); - - IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata(); - - MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, - compilationProvider, storageComponentProvider); - translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, - new IStatementExecutor.Stats(), clientContextID, statementExecutorContext); - - outPrinter.close(); - responseMsg.setResult(outWriter.toString()); - responseMsg.setMetadata(outMetadata); - } catch (AlgebricksException | HyracksException | TokenMgrError - | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { - // we trust that "our" exceptions are serializable and have a comprehensible error message - GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); - responseMsg.setError(pe); - } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); - responseMsg.setError(new Exception(e.toString())); - } - try { - messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); - } catch (Exception e) { - LOGGER.log(Level.WARNING, e.toString(), e); - } - }); + ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); + try { + IParser parser = compilationProvider.getParserFactory().createParser(statementsText); + List<Statement> statements = parser.parse(); + StringWriter outWriter = new StringWriter(256); + PrintWriter outPrinter = new PrintWriter(outWriter); + SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); + SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); + SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); + SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); + SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix, + appendHandle, appendStatus); + IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata(); + MetadataManager.INSTANCE.init(); + IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, + compilationProvider, storageComponentProvider); + translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, new IStatementExecutor.Stats(), + clientContextID, statementExecutorContext); + outPrinter.close(); + responseMsg.setResult(outWriter.toString()); + responseMsg.setMetadata(outMetadata); + } catch (AlgebricksException | HyracksException | TokenMgrError + | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { + // we trust that "our" exceptions are serializable and have a comprehensible error message + GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); + responseMsg.setError(pe); + } catch (Exception e) { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); + responseMsg.setError(new Exception(e.toString())); + } + try { + messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e.toString(), e); + } } private String getRejectionReason(ClusterControllerService ccSrv) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java index 4f9aa0c..54f0a4e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java @@ -19,8 +19,6 @@ package org.apache.asterix.app.message; -import java.util.concurrent.TimeUnit; - import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.messaging.api.MessageFuture; @@ -30,8 +28,6 @@ public final class ExecuteStatementResponseMessage implements INcAddressedMessage { private static final long serialVersionUID = 1L; - - public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); private final long requestMessageId; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index b97c014..81210d5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -267,8 +267,8 @@ */ String threadName = Thread.currentThread().getName(); Thread.currentThread().setName(QueryTranslator.class.getSimpleName()); - Map<String, String> config = new HashMap<>(); try { + Map<String, String> config = new HashMap<>(); for (Statement stmt : statements) { if (sessionConfig.is(SessionConfig.FORMAT_HTML)) { sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR); @@ -925,7 +925,7 @@ "Failed to create job spec for replicating Files Index For external dataset"); } filesIndexReplicated = true; - runJob(hcc, spec, jobFlags); + runJob(hcc, spec, jobFlags, Long.MAX_VALUE); } } @@ -956,7 +956,7 @@ bActiveTxn = false; progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA; // #. create the index artifact in NC. - runJob(hcc, spec, jobFlags); + runJob(hcc, spec, jobFlags, Long.MAX_VALUE); // #. flush the internal dataset for correlated policy if (ds.isCorrelated() && ds.getDatasetType() == DatasetType.INTERNAL) { @@ -972,7 +972,7 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - runJob(hcc, spec, jobFlags); + runJob(hcc, spec, jobFlags, Long.MAX_VALUE); // #. begin new metadataTxn mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -1009,7 +1009,7 @@ ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - runJob(hcc, jobSpec, jobFlags); + runJob(hcc, jobSpec, jobFlags, Long.MAX_VALUE); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -1028,7 +1028,7 @@ JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - runJob(hcc, jobSpec, jobFlags); + runJob(hcc, jobSpec, jobFlags, Long.MAX_VALUE); } catch (Exception e2) { e.addSuppressed(e2); if (bActiveTxn) { @@ -1346,7 +1346,8 @@ // remove the all indexes in NC try { for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + JobId jobId = JobUtils.runJob(hcc, jobSpec); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } } catch (Exception e2) { // do no throw exception since still the metadata needs to be compensated. @@ -2340,7 +2341,7 @@ final ResultReader resultReader = new ResultReader(hdc, id, resultSetId); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); - }, clientContextId, ctx); + }, clientContextId, ctx, sessionConfig.getTimeout()); break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { @@ -2349,7 +2350,7 @@ outMetadata.getResultSets() .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType())); } - }, clientContextId, ctx); + }, clientContextId, ctx, sessionConfig.getTimeout()); break; default: break; @@ -2369,7 +2370,7 @@ printed.setTrue(); printed.notify(); } - }, clientContextId, ctx); + }, clientContextId, ctx, sessionConfig.getTimeout()); } catch (Exception e) { if (JobId.INVALID.equals(jobId.getValue())) { // compilation failed @@ -2390,24 +2391,25 @@ } private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception { - runJob(hcc, jobSpec, jobFlags); + runJob(hcc, jobSpec, jobFlags, sessionOutput.config().getTimeout()); } - private static void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) - throws Exception { - JobUtils.runJob(hcc, jobSpec, jobFlags, true); + private static void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, + long timeout) throws Exception { + JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags); + hcc.waitForCompletion(jobId, timeout); } private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, - String clientContextId, IStatementExecutorContext ctx) throws Exception { + String clientContextId, IStatementExecutorContext ctx, long timeout) throws Exception { locker.lock(); try { final JobSpecification jobSpec = compiler.compile(); if (jobSpec == null) { return; } - final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); + final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags); if (ctx != null && clientContextId != null) { ctx.put(clientContextId, jobId); // Adds the running job into the context. } @@ -2416,9 +2418,9 @@ } if (ResultDelivery.ASYNC == resultDelivery) { printer.print(jobId); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, timeout); } else { - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, timeout); printer.print(jobId); } } finally { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index e8636c8..b1a3235 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -106,6 +106,7 @@ protected WebManager webManager; protected CcApplicationContext appCtx; private IJobCapacityController jobCapacityController; + private IHyracksClientConnection hcc; @Override public void start(IServiceContext serviceCtx, String[] args) throws Exception { @@ -121,9 +122,11 @@ if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting Asterix cluster controller"); } - ccServiceCtx.setThreadFactory( new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); + String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress(); + int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); + hcc = new HyracksConnection(strIP, port); ILibraryManager libraryManager = new ExternalLibraryManager(); ResourceIdManager resourceIdManager = new ResourceIdManager(); IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); @@ -357,9 +360,7 @@ return appCtx; } - protected IHyracksClientConnection getHcc() throws Exception { - String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress(); - int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); - return new HyracksConnection(strIP, port); + public IHyracksClientConnection getHcc() throws Exception { + return hcc; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index 3209557..40b6c90 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -77,7 +77,7 @@ private void executeHyracksJob(JobSpecification spec) throws Exception { spec.setMaxReattempts(0); JobId jobId = hcc.startJob(spec); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java index 958444c..d9ddb4f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java @@ -35,6 +35,7 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; @@ -60,13 +61,13 @@ new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs); org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId(); - FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId, - dataset.getDatasetId()); + FlushDatasetOperatorDescriptor flushOperator = + new FlushDatasetOperatorDescriptor(spec, jobId, dataset.getDatasetId()); spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider - .getSplitProviderAndConstraints(dataset, dataset.getDatasetName()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset, dataset.getDatasetName()); AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource, @@ -74,7 +75,8 @@ JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true); spec.setJobletEventListenerFactory(jobEventListenerFactory); - JobUtils.runJob(hcc, spec, true); + JobId hyracksJobId = JobUtils.runJob(hcc, spec); + hcc.waitForCompletion(hyracksJobId, Long.MAX_VALUE); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index ceaf4cf..62ca32c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -114,9 +114,9 @@ if (!targetNcNames.isEmpty()) { // Creates a node group for rebalance. - String nodeGroupName = DatasetUtil - .createNodeGroupForNewDataset(sourceDataset.getDataverseName(), sourceDataset.getDatasetName(), - sourceDataset.getRebalanceCount() + 1, targetNcNames, metadataProvider); + String nodeGroupName = DatasetUtil.createNodeGroupForNewDataset(sourceDataset.getDataverseName(), + sourceDataset.getDatasetName(), sourceDataset.getRebalanceCount() + 1, targetNcNames, + metadataProvider); // The target dataset for rebalance. targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName); @@ -266,7 +266,8 @@ private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { JobSpecification spec = DatasetUtil.createDatasetJobSpec(target, metadataProvider); - JobUtils.runJob(hcc, spec, true); + org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, spec); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } // Populates the data from the source dataset to the rebalance target dataset. @@ -303,7 +304,8 @@ spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, commitOp, 0); // Executes the job. - JobUtils.runJob(hcc, spec, true); + org.apache.hyracks.api.job.JobId hyracksJobId = JobUtils.runJob(hcc, spec); + hcc.waitForCompletion(hyracksJobId, Long.MAX_VALUE); } // Creates the primary index upsert operator for populating the target dataset. @@ -341,7 +343,8 @@ jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, true)); } for (JobSpecification jobSpec : jobs) { - JobUtils.runJob(hcc, jobSpec, true); + org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, jobSpec); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } } @@ -355,12 +358,14 @@ // Creates the secondary index. JobSpecification indexCreationJobSpec = IndexUtil.buildSecondaryIndexCreationJobSpec(target, index, metadataProvider); - JobUtils.runJob(hcc, indexCreationJobSpec, true); + org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, indexCreationJobSpec); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); // Loads the secondary index. JobSpecification indexLoadingJobSpec = IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, metadataProvider); - JobUtils.runJob(hcc, indexLoadingJobSpec, true); + jobId = JobUtils.runJob(hcc, indexLoadingJobSpec); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } } @@ -393,9 +398,9 @@ dropDatasetFiles(dataset, metadataProvider, hcc); // drop dataset entry from metadata - runMetadataTransaction(metadataProvider, () -> MetadataManager.INSTANCE - .dropDataset(metadataProvider.getMetadataTxnContext(), dataset.getDataverseName(), - dataset.getDatasetName())); + runMetadataTransaction(metadataProvider, + () -> MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getDatasetName())); // try to drop the dataset's node group runMetadataTransaction(metadataProvider, () -> tryDropDatasetNodegroup(dataset, metadataProvider)); }); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java index cacbfbc..c60ebac 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java @@ -33,18 +33,13 @@ ADDED_PENDINGOP_RECORD_TO_METADATA } - public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion) - throws Exception { - return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion); + public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec) throws Exception { + return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class)); } - public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags, - boolean waitForCompletion) throws Exception { + public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags) + throws Exception { spec.setMaxReattempts(0); - final JobId jobId = hcc.startJob(spec, jobFlags); - if (waitForCompletion) { - hcc.waitForCompletion(jobId); - } - return jobId; + return hcc.startJob(spec, jobFlags); } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index fa60bba..910247b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -350,12 +350,14 @@ // # disconnect the feeds for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) { - JobUtils.runJob(hcc, p.first, true); + org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, p.first); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } // #. run the jobs for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, jobSpec); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction()); @@ -389,7 +391,8 @@ // #. run the jobs for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + org.apache.hyracks.api.job.JobId jobId = JobUtils.runJob(hcc, jobSpec); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } if (!indexes.isEmpty()) { ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(this); diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java index 2971b72..d2af495 100644 --- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java @@ -65,8 +65,7 @@ ncConfig1.setClusterListenAddress("127.0.0.1"); ncConfig1.setDataListenAddress("127.0.0.1"); ncConfig1.setResultListenAddress("127.0.0.1"); - ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", - "device0") }); + ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); FileUtils.forceMkdir(new File(ncConfig1.getIODevices()[0])); nc1 = new NodeControllerService(ncConfig1); nc1.start(); @@ -77,8 +76,7 @@ ncConfig2.setClusterListenAddress("127.0.0.1"); ncConfig2.setDataListenAddress("127.0.0.1"); ncConfig2.setResultListenAddress("127.0.0.1"); - ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", - "device1") }); + ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); FileUtils.forceMkdir(new File(ncConfig1.getIODevices()[0])); nc2 = new NodeControllerService(ncConfig2); nc2.start(); @@ -96,7 +94,7 @@ AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString()); JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString()); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index e2868ae..15e5952 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -130,6 +130,9 @@ public CancelJobFunction(JobId jobId) { this.jobId = jobId; + if (jobId == null) { + throw new NullPointerException("jobId"); + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index 0142c7d..2718ac3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -37,6 +37,7 @@ public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface { private static final int SHUTDOWN_CONNECTION_TIMEOUT_SECS = 30; + private static final long DEFAULT_TIMEOUT = Long.MAX_VALUE; private final IIPCHandle ipcHandle; @@ -51,112 +52,113 @@ public ClusterControllerInfo getClusterControllerInfo() throws Exception { HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction(); - return (ClusterControllerInfo) rpci.call(ipcHandle, gccif); + return (ClusterControllerInfo) rpci.call(ipcHandle, gccif, DEFAULT_TIMEOUT); } @Override public JobStatus getJobStatus(JobId jobId) throws Exception { HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId); - return (JobStatus) rpci.call(ipcHandle, gjsf); + return (JobStatus) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT); } @Override public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception { HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags); - return (JobId) rpci.call(ipcHandle, sjf); + return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT); } @Override public void cancelJob(JobId jobId) throws Exception { - HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction( - jobId); - rpci.call(ipcHandle, cjf); + HyracksClientInterfaceFunctions.CancelJobFunction cjf = + new HyracksClientInterfaceFunctions.CancelJobFunction(jobId); + rpci.call(ipcHandle, cjf, DEFAULT_TIMEOUT); } @Override public JobId startJob(JobId jobId) throws Exception { HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(jobId); - return (JobId) rpci.call(ipcHandle, sjf); + return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT); } @Override public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception { - HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction( - deploymentId, acggfBytes, jobFlags); - return (JobId) rpci.call(ipcHandle, sjf); + HyracksClientInterfaceFunctions.StartJobFunction sjf = + new HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, jobFlags); + return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT); } @Override public JobId distributeJob(byte[] acggfBytes) throws Exception { HyracksClientInterfaceFunctions.DistributeJobFunction sjf = new HyracksClientInterfaceFunctions.DistributeJobFunction(acggfBytes); - return (JobId) rpci.call(ipcHandle, sjf); + return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT); } @Override public JobId destroyJob(JobId jobId) throws Exception { HyracksClientInterfaceFunctions.DestroyJobFunction sjf = new HyracksClientInterfaceFunctions.DestroyJobFunction(jobId); - return (JobId) rpci.call(ipcHandle, sjf); + return (JobId) rpci.call(ipcHandle, sjf, DEFAULT_TIMEOUT); } @Override public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception { HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction(); - return (NetworkAddress) rpci.call(ipcHandle, gddsf); + return (NetworkAddress) rpci.call(ipcHandle, gddsf, DEFAULT_TIMEOUT); } @Override - public void waitForCompletion(JobId jobId) throws Exception { + public void waitForCompletion(JobId jobId, long timeout) throws Exception { HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId); - rpci.call(ipcHandle, wfcf); + rpci.call(ipcHandle, wfcf, timeout); } + @SuppressWarnings("unchecked") @Override public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception { HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction(); - return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif); + return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif, DEFAULT_TIMEOUT); } @Override public ClusterTopology getClusterTopology() throws Exception { HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new HyracksClientInterfaceFunctions.GetClusterTopologyFunction(); - return (ClusterTopology) rpci.call(ipcHandle, gctf); + return (ClusterTopology) rpci.call(ipcHandle, gctf, DEFAULT_TIMEOUT); } @Override public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception { HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId); - rpci.call(ipcHandle, dbf); + rpci.call(ipcHandle, dbf, DEFAULT_TIMEOUT); } @Override public void unDeployBinary(DeploymentId deploymentId) throws Exception { HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId); - rpci.call(ipcHandle, dbf); + rpci.call(ipcHandle, dbf, DEFAULT_TIMEOUT); } @Override public JobInfo getJobInfo(JobId jobId) throws Exception { HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId); - return (JobInfo) rpci.call(ipcHandle, gjsf); + return (JobInfo) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT); } @Override public void stopCluster(boolean terminateNCService) throws Exception { HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf = new HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService); - rpci.call(ipcHandle, csdf); + rpci.call(ipcHandle, csdf, DEFAULT_TIMEOUT); int i = 0; // give the CC some time to do final settling after it returns our request while (ipcHandle.isConnected() && i++ < SHUTDOWN_CONNECTION_TIMEOUT_SECS) { @@ -165,8 +167,8 @@ } } if (ipcHandle.isConnected()) { - throw new IPCException("CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS - + " seconds"); + throw new IPCException( + "CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds"); } } @@ -174,13 +176,13 @@ public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception { HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gjsf = new HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction(nodeId, includeStats, includeConfig); - return (String) rpci.call(ipcHandle, gjsf); + return (String) rpci.call(ipcHandle, gjsf, DEFAULT_TIMEOUT); } @Override public String getThreadDump(String node) throws Exception { HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = new HyracksClientInterfaceFunctions.ThreadDumpFunction(node); - return (String)rpci.call(ipcHandle, tdf); + return (String) rpci.call(ipcHandle, tdf, DEFAULT_TIMEOUT); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 75cbf61..7b5dfe8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeoutException; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; @@ -124,6 +125,7 @@ return hci.startJob(jobId); } + @Override public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception { return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); } @@ -132,15 +134,16 @@ return hci.distributeJob(JavaSerializationUtils.serialize(acggf)); } + @Override public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception { return hci.getDatasetDirectoryServiceInfo(); } @Override - public void waitForCompletion(JobId jobId) throws Exception { + public void waitForCompletion(JobId jobId, long timeout) throws Exception { try { - hci.waitForCompletion(jobId); - } catch (InterruptedException e) { + hci.waitForCompletion(jobId, timeout); + } catch (InterruptedException | TimeoutException e) { // Cancels an on-going job if the current thread gets interrupted. hci.cancelJob(jobId); throw e; @@ -152,7 +155,7 @@ try { return hci.getNodeControllersInfo(); } catch (Exception e) { - throw new HyracksException(e); + throw HyracksException.create(e); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index 0956d85..49371c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -141,8 +141,7 @@ * JobId of the Job * @throws Exception */ - public void waitForCompletion(JobId jobId) throws Exception; - + public void waitForCompletion(JobId jobId, long timeout) throws Exception; /** * Deploy the user-defined jars to the cluster @@ -201,16 +200,19 @@ /** * Shuts down all NCs and then the CC. + * * @param terminateNCService */ public void stopCluster(boolean terminateNCService) throws Exception; /** * Get details of specified node as JSON object + * * @param nodeId - * id the subject node + * id the subject node * @param includeStats - * @param includeConfig @return serialized JSON containing the node details + * @param includeConfig + * @return serialized JSON containing the node details * @throws Exception */ public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 1afbe9e..7e3715a 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -48,7 +48,7 @@ public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; - public void waitForCompletion(JobId jobId) throws Exception; + public void waitForCompletion(JobId jobId, long timeout) throws Exception; public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java index 4310cd0..7abb22f 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java @@ -40,16 +40,16 @@ @Override public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception { - HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction( - jobId, rsId); - return (Status) rpci.call(ipcHandle, gdrlf); + HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = + new HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction(jobId, rsId); + return (Status) rpci.call(ipcHandle, gdrlf, Long.MAX_VALUE); } @Override public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) throws Exception { - HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction( - jobId, rsId, knownRecords); - return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf); + HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = + new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(jobId, rsId, knownRecords); + return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf, Long.MAX_VALUE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java index 54ae838..1b253d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java @@ -94,7 +94,7 @@ public static void runJob(JobSpecification spec, String appName) throws Exception { spec.setFrameSize(FRAME_SIZE); JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index dbbaf9f..bde9b9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -65,6 +65,8 @@ import org.apache.hyracks.control.cc.work.JobCleanupWork; import org.apache.hyracks.control.common.job.PartitionState; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; +import org.apache.hyracks.control.common.work.DummyCallback; +import org.apache.hyracks.control.common.work.IResultCallback; public class JobExecutor { private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName()); @@ -114,9 +116,10 @@ ccs.getContext().notifyJobStart(jobRun.getJobId()); } - public void cancelJob() throws HyracksException { + public void cancelJob(IResultCallback<Void> callback) throws HyracksException { // If the job is already terminated or failed, do nothing here. if (jobRun.getPendingStatus() != null) { + callback.setValue(null); return; } // Sets the cancelled flag. @@ -124,7 +127,8 @@ // Aborts on-ongoing task clusters. abortOngoingTaskClusters(ta -> false, ta -> null); // Aborts the whole job. - abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId()))); + abortJob(Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobRun.getJobId())), + callback); } private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots) @@ -196,8 +200,8 @@ "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters); } if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) { - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, null)); + ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.TERMINATED, + null, DummyCallback.INSTANCE)); return; } startRunnableTaskClusters(taskClusterRoots); @@ -520,14 +524,14 @@ } } - public void abortJob(List<Exception> exceptions) { + public void abortJob(List<Exception> exceptions, IResultCallback<Void> callback) { Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters); for (TaskCluster tc : inProgressTaskClustersCopy) { abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED); } assert inProgressTaskClusters.isEmpty(); - ccs.getWorkQueue() - .schedule(new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions)); + ccs.getWorkQueue().schedule( + new JobCleanupWork(ccs.getJobManager(), jobRun.getJobId(), JobStatus.FAILURE, exceptions, callback)); } private void abortTaskCluster(TaskClusterAttempt tcAttempt, @@ -686,7 +690,7 @@ + " as failed and the number of max re-attempts = " + maxReattempts); if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) { LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId()); - abortJob(exceptions); + abortJob(exceptions, DummyCallback.INSTANCE); return; } LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId()); @@ -696,7 +700,7 @@ "Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt); } } catch (Exception e) { - abortJob(Collections.singletonList(e)); + abortJob(Collections.singletonList(e), DummyCallback.INSTANCE); } } @@ -720,7 +724,7 @@ ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId())); startRunnableActivityClusters(); } catch (Exception e) { - abortJob(Collections.singletonList(e)); + abortJob(Collections.singletonList(e), DummyCallback.INSTANCE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java index 8fe542f..5ae19a1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.common.work.IResultCallback; /** * This interface abstracts the job lifecycle management and job scheduling for a cluster. @@ -46,11 +47,13 @@ /** * Cancel a job with a given job id. + * + * @param callback * * @param jobId, * the id of the job. */ - void cancel(JobId jobId) throws HyracksException; + void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException; /** * This method is called when the master process decides to complete job. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index abf1d57..117cbe1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -46,6 +46,8 @@ import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue; import org.apache.hyracks.control.cc.scheduler.IJobQueue; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.work.DummyCallback; +import org.apache.hyracks.control.common.work.IResultCallback; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -115,17 +117,14 @@ } @Override - public void cancel(JobId jobId) throws HyracksException { - if (jobId == null) { - return; - } + public void cancel(JobId jobId, IResultCallback<Void> callback) throws HyracksException { // Cancels a running job. if (activeRunMap.containsKey(jobId)) { JobRun jobRun = activeRunMap.get(jobId); // The following call will abort all ongoing tasks and then consequently // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. // Therefore, we do not remove the job out of activeRunMap here. - jobRun.getExecutor().cancelJob(); + jobRun.getExecutor().cancelJob(callback); return; } // Removes a pending job. @@ -138,6 +137,7 @@ runMapArchive.put(jobId, jobRun); runMapHistory.put(jobId, exceptions); } + callback.setValue(null); } @Override @@ -322,7 +322,7 @@ // fail the job then abort it run.setStatus(JobStatus.FAILURE, exceptions); // abort job will trigger JobCleanupWork - run.getExecutor().abortJob(exceptions); + run.getExecutor().abortJob(exceptions, DummyCallback.INSTANCE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java index f3b67c9..e3135df 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java @@ -42,10 +42,7 @@ @Override protected void doRun() throws Exception { try { - if (jobId != null) { - jobManager.cancel(jobId); - } - callback.setValue(null); + jobManager.cancel(jobId, callback); } catch (Exception e) { callback.setException(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java index 502ac50..bb85c13 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java @@ -29,6 +29,7 @@ import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.common.work.IResultCallback; public class JobCleanupWork extends AbstractWork { private static final Logger LOGGER = Logger.getLogger(JobCleanupWork.class.getName()); @@ -37,12 +38,15 @@ private JobId jobId; private JobStatus status; private List<Exception> exceptions; + private IResultCallback<Void> callback; - public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions) { + public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions, + IResultCallback<Void> callback) { this.jobManager = jobManager; this.jobId = jobId; this.status = status; this.exceptions = exceptions; + this.callback = callback; } @Override @@ -53,6 +57,7 @@ try { JobRun jobRun = jobManager.get(jobId); jobManager.prepareComplete(jobRun, status, exceptions); + callback.setValue(null); } catch (HyracksException e) { // Fail the job with the caught exception during final completion. JobRun run = jobManager.get(jobId); @@ -62,6 +67,7 @@ } completionException.add(0, e); run.setStatus(JobStatus.FAILURE, completionException); + callback.setException(e); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index ed2a740..f99d465 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -48,6 +48,7 @@ import org.apache.hyracks.control.common.base.INodeController; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.logs.LogFile; +import org.apache.hyracks.control.common.work.DummyCallback; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -207,7 +208,7 @@ } @Test - public void testCancel() throws HyracksException { + public void testCancel() throws Exception { CCConfig ccConfig = new CCConfig(); IJobCapacityController jobCapacityController = mock(IJobCapacityController.class); IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController)); @@ -247,12 +248,12 @@ // Cancels deferred jobs. for (JobRun run : deferredRuns) { - jobManager.cancel(run.getJobId()); + jobManager.cancel(run.getJobId(), DummyCallback.INSTANCE); } // Cancels runnable jobs. for (JobRun run : acceptedRuns) { - jobManager.cancel(run.getJobId()); + jobManager.cancel(run.getJobId(), DummyCallback.INSTANCE); } Assert.assertTrue(jobManager.getPendingJobs().isEmpty()); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java new file mode 100644 index 0000000..24946f0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/DummyCallback.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.work; + +public class DummyCallback implements IResultCallback<Void> { + + public static final DummyCallback INSTANCE = new DummyCallback(); + + private DummyCallback() { + } + + @Override + public void setValue(Void result) { + // Dummy is used when no callback is provided + } + + @Override + public void setException(Exception e) { + // Dummy is used when no callback is provided + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java index 0ccaf1d..b8eefe7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/IResultCallback.java @@ -19,6 +19,7 @@ package org.apache.hyracks.control.common.work; public interface IResultCallback<T> { + public void setValue(T result); public void setException(Exception e); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java index 48377e3..4cf1d12 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java @@ -89,7 +89,7 @@ long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println(start + " " + end + " " + (end - start)); } @@ -144,7 +144,8 @@ // B-Tree tuple, etc. IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName); - IIndexDataflowHelperFactory primaryHelperFactory = new IndexDataflowHelperFactory(storageManager, primarySplitProvider); + IIndexDataflowHelperFactory primaryHelperFactory = + new IndexDataflowHelperFactory(storageManager, primarySplitProvider); // create operator descriptor TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert = diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java index 203d22c..dc13c32 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java @@ -84,7 +84,7 @@ JobSpecification job = createJob(options); long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println(start + " " + end + " " + (end - start)); } @@ -145,7 +145,8 @@ // to field 0 of B-Tree tuple, // etc. IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName); - IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(storageManager, btreeSplitProvider); + IIndexDataflowHelperFactory dataflowHelperFactory = + new IndexDataflowHelperFactory(storageManager, btreeSplitProvider); TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, recDesc, fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java index 603dc6b..bf3b41e 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/PrimaryIndexSearchExample.java @@ -80,7 +80,7 @@ long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println(start + " " + end + " " + (end - start)); } @@ -139,7 +139,8 @@ // into search op IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName); - IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(storageManager, btreeSplitProvider); + IIndexDataflowHelperFactory dataflowHelperFactory = + new IndexDataflowHelperFactory(storageManager, btreeSplitProvider); BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java index 7507f10..52a0ace 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java @@ -83,7 +83,7 @@ long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println(start + " " + end + " " + (end - start)); } @@ -117,7 +117,8 @@ // use a disk-order scan to read primary index IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName); - IIndexDataflowHelperFactory primaryHelperFactory = new IndexDataflowHelperFactory(storageManager, primarySplitProvider); + IIndexDataflowHelperFactory primaryHelperFactory = + new IndexDataflowHelperFactory(storageManager, primarySplitProvider); TreeIndexDiskOrderScanOperatorDescriptor btreeScanOp = new TreeIndexDiskOrderScanOperatorDescriptor(spec, recDesc, primaryHelperFactory, NoOpOperationCallbackFactory.INSTANCE); JobHelper.createPartitionConstraint(spec, btreeScanOp, splitNCs); @@ -139,7 +140,8 @@ // tuple int[] fieldPermutation = { 1, 0 }; IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName); - IIndexDataflowHelperFactory secondaryHelperFactory = new IndexDataflowHelperFactory(storageManager, btreeSplitProvider); + IIndexDataflowHelperFactory secondaryHelperFactory = + new IndexDataflowHelperFactory(storageManager, btreeSplitProvider); TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, 0.7f, false, 1000L, true, secondaryHelperFactory); JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java index 1e909ef..5d72703 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/SecondaryIndexSearchExample.java @@ -83,7 +83,7 @@ long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println(start + " " + end + " " + (end - start)); } @@ -183,7 +183,8 @@ // op IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName); - IIndexDataflowHelperFactory primaryHelperFactory = new IndexDataflowHelperFactory(storageManager, primarySplitProvider); + IIndexDataflowHelperFactory primaryHelperFactory = + new IndexDataflowHelperFactory(storageManager, primarySplitProvider); BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, primaryLowKeyFields, primaryHighKeyFields, true, true, primaryHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, null, null, false); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java index 82fd737..48ca36f 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java @@ -106,7 +106,7 @@ ncConfig1.setClusterListenAddress("127.0.0.1"); ncConfig1.setDataListenAddress("127.0.0.1"); ncConfig1.setResultListenAddress("127.0.0.1"); - ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); + ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); nc1 = new NodeControllerService(ncConfig1); nc1.start(); @@ -116,7 +116,7 @@ ncConfig2.setClusterListenAddress("127.0.0.1"); ncConfig2.setDataListenAddress("127.0.0.1"); ncConfig2.setResultListenAddress("127.0.0.1"); - ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); + ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); nc2 = new NodeControllerService(ncConfig2); nc2.start(); @@ -146,7 +146,7 @@ protected void runTest(JobSpecification spec) throws Exception { JobId jobId = executeTest(spec); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } protected List<String> readResults(JobSpecification spec, JobId jobId, ResultSetId resultSetId) throws Exception { @@ -209,7 +209,7 @@ expectedFile.close(); } - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); return true; } @@ -226,7 +226,7 @@ } output.close(); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } protected FileSplit createFile(NodeControllerService ncs) throws IOException { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 18479e2..1c0220d 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -127,7 +127,7 @@ } protected void waitForCompletion(JobId jobId) throws Exception { - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } protected JobStatus getJobStatus(JobId jobId) throws Exception { @@ -188,7 +188,7 @@ } boolean expectedExceptionThrown = false; try { - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); } catch (HyracksDataException hde) { if (expectedErrorMessage != null) { if (hde.toString().contains(expectedErrorMessage)) { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java index f2f8061..c4636fb 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobStatusAPIIntegrationTest.java @@ -104,7 +104,7 @@ WaitingOperatorDescriptor.CONTINUE_RUNNING.setTrue(); WaitingOperatorDescriptor.CONTINUE_RUNNING.notify(); } - hcc.waitForCompletion(jId); + hcc.waitForCompletion(jId, Long.MAX_VALUE); } private int countJobs(String status) throws IOException, URISyntaxException { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java index 4a01fdb..0c27b3d 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java @@ -79,7 +79,7 @@ ncConfig1.setDataListenAddress("127.0.0.1"); ncConfig1.setResultListenAddress("127.0.0.1"); ncConfig1.setResultSweepThreshold(5000); - ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); + ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); NodeControllerService nc1Base = new NodeControllerService(ncConfig1); nc1 = Mockito.spy(nc1Base); nc1.start(); @@ -91,7 +91,7 @@ ncConfig2.setDataListenAddress("127.0.0.1"); ncConfig2.setResultListenAddress("127.0.0.1"); ncConfig2.setResultSweepThreshold(5000); - ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); + ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); NodeControllerService nc2Base = new NodeControllerService(ncConfig2); nc2 = Mockito.spy(nc2Base); nc2.start(); @@ -127,7 +127,7 @@ //run the first job hcc.startJob(jobId1); - hcc.waitForCompletion(jobId1); + hcc.waitForCompletion(jobId1, Long.MAX_VALUE); //destroy the first job hcc.destroyJob(jobId1); @@ -143,7 +143,7 @@ //run the second job hcc.startJob(jobId2); - hcc.waitForCompletion(jobId2); + hcc.waitForCompletion(jobId2, Long.MAX_VALUE); //wait ten seconds to ensure the result sweeper does not break the job //The result sweeper runs every 5 seconds during the tests @@ -151,7 +151,7 @@ //run the second job again hcc.startJob(jobId2); - hcc.waitForCompletion(jobId2); + hcc.waitForCompletion(jobId2, Long.MAX_VALUE); //destroy the second job hcc.destroyJob(jobId2); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java index 650c60d..d31be22 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java @@ -113,7 +113,7 @@ long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job, options.runtimeProfiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class)); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println(start + " " + end + " " + (end - start)); } @@ -139,11 +139,11 @@ JobSpecification spec = new JobSpecification(frameSize); IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits); - RecordDescriptor wordDesc = new RecordDescriptor( - new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); + RecordDescriptor wordDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); - FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider, - new WordTupleParserFactory(), wordDesc); + FileScanOperatorDescriptor wordScanner = + new FileScanOperatorDescriptor(spec, splitsProvider, new WordTupleParserFactory(), wordDesc); createPartitionConstraint(spec, wordScanner, inSplits); RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] { @@ -170,8 +170,8 @@ PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) })); spec.connect(scanGroupConn, wordScanner, 0, gBy, 0); } else { - IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; + IBinaryComparatorFactory[] cfs = + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo) ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc) @@ -195,9 +195,9 @@ } IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits); - IOperatorDescriptor writer = "text".equalsIgnoreCase(format) - ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",") - : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider); + IOperatorDescriptor writer = + "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",") + : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider); createPartitionConstraint(spec, writer, outSplits); IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java index 42fe8c9..8a86692 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java @@ -118,7 +118,7 @@ System.out.print("CreateJobTime:" + (System.currentTimeMillis() - start)); start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); System.out.println("JobExecuteTime:" + (System.currentTimeMillis() - start)); } } @@ -134,8 +134,8 @@ createPartitionConstraint(spec, fileScanner, inSplits); // Output: each unique string with an integer count - RecordDescriptor outDesc = new RecordDescriptor( - new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, + RecordDescriptor outDesc = + new RecordDescriptor(new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, // IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE }); @@ -187,9 +187,9 @@ spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0); IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits); - AbstractSingleActivityOperatorDescriptor writer = outPlain ? new PlainFileWriterOperatorDescriptor(spec, - outSplitProvider, "|") - : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider); + AbstractSingleActivityOperatorDescriptor writer = + outPlain ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|") + : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider); createPartitionConstraint(spec, writer, outSplits); IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec); spec.connect(groupOutConn, grouper, 0, writer, 0); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java index c3d0df1..2a9f351 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java @@ -140,7 +140,7 @@ long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job, options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class)); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println(start + " " + end + " " + (end - start)); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java index 8ab0708..347ade5 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java @@ -89,13 +89,13 @@ } static int[] SortFields = new int[] { 1, 0 }; - static IBinaryComparatorFactory[] SortFieldsComparatorFactories = new IBinaryComparatorFactory[] { - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; + static IBinaryComparatorFactory[] SortFieldsComparatorFactories = + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }; - static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories = new IBinaryHashFunctionFactory[] { - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), - PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }; + static IBinaryHashFunctionFactory[] orderBinaryHashFunctionFactories = + new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY), + PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }; public static void main(String[] args) throws Exception { Options options = new Options(); @@ -109,13 +109,13 @@ IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port); JobSpecification job = createJob(parseFileSplits(options.inFileOrderSplits), - parseFileSplits(options.outFileSplits), - options.memBufferAlg, options.frameLimit, options.frameSize, options.topK, options.usingHeapSorter); + parseFileSplits(options.outFileSplits), options.memBufferAlg, options.frameLimit, options.frameSize, + options.topK, options.usingHeapSorter); long start = System.currentTimeMillis(); JobId jobId = hcc.startJob(job, options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class)); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, Long.MAX_VALUE); long end = System.currentTimeMillis(); System.err.println("finished in:" + (end - start) + "ms"); } @@ -135,8 +135,8 @@ SortFieldsComparatorFactories, ordersDesc); } else { if (memBufferAlg.equalsIgnoreCase("bestfit")) { - sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, - null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, + sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null, + SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.SMALLEST_FIT, limit); } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) { sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null, @@ -158,8 +158,8 @@ spec.connect( new MToNPartitioningMergingConnectorDescriptor(spec, - new FieldHashPartitionComputerFactory(SortFields, orderBinaryHashFunctionFactories), - SortFields, SortFieldsComparatorFactories, new UTF8StringNormalizedKeyComputerFactory()), + new FieldHashPartitionComputerFactory(SortFields, orderBinaryHashFunctionFactories), SortFields, + SortFieldsComparatorFactories, new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0); spec.addRoot(printer); diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java index 0d0cd3e..b0aad4f 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java @@ -73,7 +73,6 @@ private static final String PATH_TO_HADOOP_CONF = FileUtil.joinPath(TEST_RESOURCES, "hadoop", "conf"); protected static final String BUILD_DIR = FileUtil.joinPath("target", "build"); - private static final String DATA_PATH = FileUtil.joinPath(TEST_RESOURCES, "data", "customer.tbl"); protected static final String HDFS_INPUT_PATH = "/customer/"; protected static final String HDFS_OUTPUT_PATH = "/customer_result/"; @@ -151,11 +150,11 @@ String[] readSchedule = scheduler.getLocationConstraints(splits); JobSpecification jobSpec = new JobSpecification(); - RecordDescriptor recordDesc = new RecordDescriptor( - new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); + RecordDescriptor recordDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); - String[] locations = new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, - HyracksUtils.NC2_ID }; + String[] locations = + new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID }; HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits, readSchedule, new TextKeyValueParserFactory()); PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, locations); @@ -164,21 +163,23 @@ new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, recordDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, sortOperator, locations); - HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, conf, - new TextTupleWriterFactory()); + HDFSWriteOperatorDescriptor writeOperator = + new HDFSWriteOperatorDescriptor(jobSpec, conf, new TextTupleWriterFactory()); PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, HyracksUtils.NC1_ID); jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0); - jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory( - new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }), - new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null), + jobSpec.connect( + new MToNPartitioningMergingConnectorDescriptor(jobSpec, + new FieldHashPartitionComputerFactory(new int[] { 0 }, + new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }), + new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null), sortOperator, 0, writeOperator, 0); jobSpec.addRoot(writeOperator); - IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST, - HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT); + IHyracksClientConnection client = + new HyracksConnection(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT); JobId jobId = client.startJob(jobSpec); - client.waitForCompletion(jobId); + client.waitForCompletion(jobId, Long.MAX_VALUE); Assert.assertEquals(true, checkResults()); } @@ -195,8 +196,8 @@ Path actual = new Path(ACTUAL_RESULT_DIR); dfs.copyToLocalFile(result, actual); - TestUtils.compareWithResult(new File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")), new File( - FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result", "part-0"))); + TestUtils.compareWithResult(new File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")), + new File(FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result", "part-0"))); return true; } diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java index 1fddc46..287baeb 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java @@ -90,10 +90,10 @@ cc.stop(); } - public static void runJob(JobSpecification spec, String appName) throws Exception { + public static void runJob(JobSpecification spec, String appName, long timeout) throws Exception { spec.setFrameSize(FRAME_SIZE); JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME)); - hcc.waitForCompletion(jobId); + hcc.waitForCompletion(jobId, timeout); } } diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java index 3c9b1c0..84cdb65 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java @@ -21,7 +21,6 @@ import java.util.List; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -55,6 +54,8 @@ import org.apache.hyracks.hdfs.utils.HyracksUtils; import org.apache.hyracks.hdfs2.scheduler.Scheduler; +import junit.framework.Assert; + /** * Test the org.apache.hyracks.hdfs2.dataflow package, * the operators for the Hadoop new API. @@ -86,6 +87,7 @@ * * @throws Exception */ + @Override @SuppressWarnings({ "rawtypes", "unchecked" }) public void testHDFSReadWriteOperators() throws Exception { FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH); @@ -98,11 +100,11 @@ String[] readSchedule = scheduler.getLocationConstraints(splits); JobSpecification jobSpec = new JobSpecification(); - RecordDescriptor recordDesc = new RecordDescriptor( - new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); + RecordDescriptor recordDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() }); - String[] locations = new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, - HyracksUtils.NC2_ID }; + String[] locations = + new String[] { HyracksUtils.NC1_ID, HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID }; HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(jobSpec, recordDesc, conf, splits, readSchedule, new TextKeyValueParserFactory()); PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, locations); @@ -111,21 +113,23 @@ new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, recordDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, sortOperator, locations); - HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, conf, - new TextTupleWriterFactory()); + HDFSWriteOperatorDescriptor writeOperator = + new HDFSWriteOperatorDescriptor(jobSpec, conf, new TextTupleWriterFactory()); PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, HyracksUtils.NC1_ID); jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator, 0, sortOperator, 0); - jobSpec.connect(new MToNPartitioningMergingConnectorDescriptor(jobSpec, new FieldHashPartitionComputerFactory( - new int[] { 0 }, new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }), - new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null), + jobSpec.connect( + new MToNPartitioningMergingConnectorDescriptor(jobSpec, + new FieldHashPartitionComputerFactory(new int[] { 0 }, + new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }), + new int[] { 0 }, new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }, null), sortOperator, 0, writeOperator, 0); jobSpec.addRoot(writeOperator); - IHyracksClientConnection client = new HyracksConnection(HyracksUtils.CC_HOST, - HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT); + IHyracksClientConnection client = + new HyracksConnection(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT); JobId jobId = client.startJob(jobSpec); - client.waitForCompletion(jobId); + client.waitForCompletion(jobId, Long.MAX_VALUE); Assert.assertEquals(true, checkResults()); } diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java index 45634ad..dfd5bad 100644 --- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java +++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java @@ -260,7 +260,7 @@ return b && (path.length() == cpl || '/' == path.charAt(cpl)); } - protected HttpServerHandler<HttpServer> createHttpHandler(int chunkSize) { + protected HttpServerHandler createHttpHandler(int chunkSize) { return new HttpServerHandler<>(this, chunkSize); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java index aebe2f5..3c0f699 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeoutException; import org.apache.hyracks.ipc.exceptions.IPCException; @@ -27,10 +28,10 @@ private final Map<Long, Request> reqMap; public RPCInterface() { - reqMap = new HashMap<Long, RPCInterface.Request>(); + reqMap = new HashMap<>(); } - public Object call(IIPCHandle handle, Object request) throws Exception { + public Object call(IIPCHandle handle, Object request, long timeout) throws Exception { Request req; long mid; synchronized (this) { @@ -41,7 +42,7 @@ mid = handle.send(-1, request, null); reqMap.put(mid, req); } - return req.getResponse(); + return req.getResponse(timeout); } @Override @@ -88,10 +89,15 @@ notifyAll(); } - synchronized Object getResponse() throws Exception { - while (pending) { + synchronized Object getResponse(long timeout) throws Exception { + long start = System.currentTimeMillis(); + long now = start; + while (pending && now - start < timeout) { wait(); } + if (pending) { + throw new TimeoutException(); + } if (exception != null) { throw exception; } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java index b454520..1125c64 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java @@ -23,16 +23,15 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import junit.framework.Assert; - -import org.junit.Test; - import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; import org.apache.hyracks.ipc.api.RPCInterface; import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; +import org.junit.Test; + +import junit.framework.Assert; public class IPCTest { @Test @@ -48,11 +47,11 @@ IIPCHandle handle = client.getHandle(serverAddr); for (int i = 0; i < 100; ++i) { - Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i)); + Assert.assertEquals(rpci.call(handle, Integer.valueOf(i), Long.MAX_VALUE), Integer.valueOf(2 * i)); } try { - rpci.call(handle, "Foo"); + rpci.call(handle, "Foo", Long.MAX_VALUE); Assert.assertTrue(false); } catch (Exception e) { Assert.assertTrue(true); @@ -63,8 +62,8 @@ final Executor executor = Executors.newCachedThreadPool(); IIPCI ipci = new IIPCI() { @Override - public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid, - final Object payload, Exception exception) { + public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid, final Object payload, + Exception exception) { executor.execute(new Runnable() { @Override public void run() { -- To view, visit https://asterix-gerrit.ics.uci.edu/1961 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>