>From Ian Maxon <[email protected]>: Ian Maxon has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20292?usp=email )
Change subject: [ASTERIXDB-3644] Callback for result consumption ...................................................................... [ASTERIXDB-3644] Callback for result consumption - user model changes: no - storage format changes: no - interface changes: yes Details: Add a callback for when a result partition is consumed. Then, when all results are consumed, remove the job from the result directory. Ext-ref: MB-68455 Change-Id: I768eab7bd3ed1dbfda1bd3449264c92952b4cb53 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20292 Reviewed-by: Peeyush Gupta <[email protected]> Tested-by: Ian Maxon <[email protected]> Integration-Tests: Ian Maxon <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java 17 files changed, 188 insertions(+), 9 deletions(-) Approvals: Ian Maxon: Verified; Verified Peeyush Gupta: Looks good to me, approved Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 3b0e031..09ea35e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -65,9 +65,10 @@ } IResultSet resultSet = getResultSet(); ResultReader resultReader = new ResultReader(resultSet, handle.getJobId(), handle.getResultSetId()); + boolean printStarted = false; + ResponsePrinter printer = null; try { ResultJobRecord.Status status = resultReader.getStatus(); - final HttpResponseStatus httpStatus; if (status == null) { httpStatus = HttpResponseStatus.NOT_FOUND; @@ -79,6 +80,7 @@ case RUNNING: case IDLE: case FAILED: + case REMOVED: httpStatus = HttpResponseStatus.NOT_FOUND; break; default: @@ -92,12 +94,13 @@ } ResultMetadata metadata = (ResultMetadata) resultReader.getMetadata(); SessionOutput sessionOutput = initResponse(request, response, metadata.getFormat()); - ResponsePrinter printer = new ResponsePrinter(sessionOutput); + printer = new ResponsePrinter(sessionOutput); if (metadata.getFormat() == SessionConfig.OutputFormat.CLEAN_JSON || metadata.getFormat() == SessionConfig.OutputFormat.LOSSLESS_JSON || metadata.getFormat() == SessionConfig.OutputFormat.LOSSLESS_ADM_JSON) { final Stats stats = new Stats(); printer.begin(); + printStarted = true; printer.addResultPrinter(new ResultsPrinter(appCtx, resultReader, null, stats, sessionOutput)); printer.printResults(); ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart, @@ -112,6 +115,7 @@ } printer.printFooters(); printer.end(); + printStarted = false; } else { ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null); } @@ -127,6 +131,10 @@ } catch (Exception e) { response.setStatus(HttpResponseStatus.BAD_REQUEST); LOGGER.log(Level.WARN, "Error retrieving result for \"" + strHandle + "\"", e); + } finally { + if (printStarted && printer != null) { + printer.end(); + } } if (response.writer().checkError()) { LOGGER.warn("Error flushing output writer for \"" + strHandle + "\""); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml index 9a5e17e..a6aef7c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml @@ -59,7 +59,7 @@ <compilation-unit name="async-exhausted-result"> <output-dir compare="Clean-JSON">async-exhausted-result</output-dir> <parameter name="profile" value="timings" type="string"/> - <expected-error>Premature end of chunk</expected-error> <!--TODO:REVISIT --> + <expected-error>HTTP/1.1 404 Not Found</expected-error> <source-location>false</source-location> </compilation-unit> </test-case> diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java index 4798494..184cb7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java @@ -57,4 +57,6 @@ * @throws Exception */ IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws Exception; + + void releaseResult(JobId jobId, ResultSetId rsId); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java index 4909903..f107fb8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java @@ -32,10 +32,12 @@ void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException; + void reportPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws HyracksException; + void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc) throws HyracksException; - void removePartition(JobId jobId, ResultSetId resultSetId, int partition); + void removePartition(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException; void abortReader(JobId jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java index 71792be..a442c11c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java @@ -78,6 +78,10 @@ updateStatus(Status.SUCCESS); } + public boolean ready() { + return status == Status.SUCCESS; + } + public void fail() { status = Status.FAILED; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java index 02762ee..44de45f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java @@ -34,6 +34,7 @@ IDLE, RUNNING, SUCCESS, + REMOVED, FAILED } @@ -114,6 +115,14 @@ updateState(State.SUCCESS); } + public void consume() { + updateState(State.REMOVED); + } + + public boolean consumed() { + return status.getState() == State.REMOVED; + } + public void fail(List<Exception> exceptions) { updateState(State.FAILED); status.setExceptions(exceptions); @@ -166,13 +175,21 @@ public synchronized void updateState() { int successCount = 0; + int consumedCount = 0; ResultDirectoryRecord[] records = resultSetMetaData.getRecords(); for (ResultDirectoryRecord record : records) { - if ((record != null) && (record.getStatus() == ResultDirectoryRecord.Status.SUCCESS)) { - successCount++; + if (record != null) { + if (record.ready()) { + successCount++; + } + if (record.hasReachedReadEOS()) { + consumedCount++; + } } } - if (successCount == records.length) { + if (consumedCount == records.length) { + consume(); + } else if (successCount == records.length) { success(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java index 5dd5fff..1344473 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java @@ -67,6 +67,11 @@ } @Override + public void releaseResult(JobId jobId, ResultSetId rsId) { + + } + + @Override public void close() throws IOException { ipc.stop(); } diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java index ed628f7..f5c4a3a 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java @@ -60,4 +60,9 @@ new HyracksClientInterfaceFunctions.GetResultMetadataFunction(jobId, rsId); return (IResultMetadata) rpci.call(ipcHandle, grmf); } + + @Override + public void releaseResult(JobId jobId, ResultSetId rsId) { + + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java index d350f61..6d32621 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -36,6 +36,7 @@ import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork; import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork; import org.apache.hyracks.control.cc.work.ReportProfilesWork; +import org.apache.hyracks.control.cc.work.ReportResultPartitionConsumedWork; import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork; import org.apache.hyracks.control.cc.work.TaskCompleteWork; import org.apache.hyracks.control.cc.work.TaskFailureWork; @@ -127,6 +128,12 @@ ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition())); break; + case REPORT_RESULT_PARTITION_CONSUMED: + CCNCFunctions.ReportResultPartitionConsumedFunction rrpc = + (CCNCFunctions.ReportResultPartitionConsumedFunction) fn; + ccs.getWorkQueue().schedule(new ReportResultPartitionConsumedWork(ccs, rrpc.getJobId(), + rrpc.getResultSetId(), rrpc.getPartition())); + break; case SEND_APPLICATION_MESSAGE: CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn; ApplicationMessageWork work = diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java index b1ecb45..a986f33 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java @@ -43,6 +43,8 @@ public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksDataException; + public void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws HyracksDataException; + public void reportJobFailure(JobId jobId, List<Exception> exceptions); public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index b6274d9..139775c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -155,6 +155,18 @@ } @Override + public synchronized void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) + throws HyracksDataException { + ResultJobRecord djr = getNonNullResultJobRecord(jobId); + djr.getDirectoryRecord(partition).readEOS(); + djr.updateState(); + if (djr.consumed()) { + sweep(jobId); + } + notifyAll(); + } + + @Override public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) { ResultJobRecord rjr = getResultJobRecord(jobId); if (logFailure(rjr)) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java new file mode 100644 index 0000000..5e61851 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.work.AbstractWork; + +public class ReportResultPartitionConsumedWork extends AbstractWork { + private final ClusterControllerService ccs; + + private final JobId jobId; + + private final ResultSetId rsId; + + private final int partition; + + public ReportResultPartitionConsumedWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, + int partition) { + this.ccs = ccs; + this.jobId = jobId; + this.rsId = rsId; + this.partition = partition; + } + + @Override + public void run() { + try { + ccs.getResultDirectoryService().reportResultPartitionConsumed(jobId, rsId, partition); + } catch (HyracksDataException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index 1c91183..d706c58 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -73,6 +73,8 @@ void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception; + void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws Exception; + void getNodeControllerInfos() throws Exception; void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 1939f9e..ace6095 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -93,6 +93,7 @@ REGISTER_PARTITION_REQUEST, REGISTER_RESULT_PARTITION_LOCATION, REPORT_RESULT_PARTITION_WRITE_COMPLETION, + REPORT_RESULT_PARTITION_CONSUMED, NODE_REGISTRATION_RESULT, START_TASKS, @@ -686,6 +687,39 @@ } } + public static class ReportResultPartitionConsumedFunction extends Function { + private static final long serialVersionUID = 1L; + + private final JobId jobId; + + private final ResultSetId rsId; + + private final int partition; + + public ReportResultPartitionConsumedFunction(JobId jobId, ResultSetId rsId, int partition) { + this.jobId = jobId; + this.rsId = rsId; + this.partition = partition; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.REPORT_RESULT_PARTITION_CONSUMED; + } + + public JobId getJobId() { + return jobId; + } + + public ResultSetId getResultSetId() { + return rsId; + } + + public int getPartition() { + return partition; + } + } + public static class NodeRegistrationResult extends Function { private static final long serialVersionUID = 1L; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 09dc04d..5313dad 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -44,6 +44,7 @@ import org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction; import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDeployedJobSpecFailureFunction; import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction; +import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionConsumedFunction; import org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction; import org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction; import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction; @@ -155,6 +156,12 @@ } @Override + public void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws Exception { + ReportResultPartitionConsumedFunction fn = new ReportResultPartitionConsumedFunction(jobId, rsId, partition); + ipcHandle.send(-1, fn, null); + } + + @Override public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception { ReportDeployedJobSpecFailureFunction fn = new ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId); ipcHandle.send(-1, fn, null); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java index 165cb95..557b74b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java @@ -110,6 +110,17 @@ } @Override + public void reportPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) throws HyracksException { + try { + LOGGER.trace("Reporting partition consumed: JobId: {}:ResultSetId: {}:partition: {}", jobId, rsId, + partition); + ncs.getClusterController(jobId.getCcId()).reportResultPartitionConsumed(jobId, rsId, partition); + } catch (Exception e) { + throw HyracksException.create(e); + } + } + + @Override public void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter writer) throws HyracksException { ResultState resultState = getResultState(jobId, resultSetId, partition); @@ -137,10 +148,12 @@ } @Override - public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) { + public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) + throws HyracksException { ResultSetMap rsIdMap = partitionResultStateMap.get(jobId); if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition)) { partitionResultStateMap.remove(jobId); + reportPartitionConsumed(jobId, resultSetId, partition); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java index 3774530..43042b3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.partitions.ResultSetPartitionId; import org.apache.hyracks.comm.channels.NetworkOutputChannel; import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface; @@ -102,8 +103,9 @@ final ResultSetPartitionId partitionId = resultState.getResultSetPartitionId(); resultPartitionManager.removePartition(partitionId.getJobId(), partitionId.getResultSetId(), partitionId.getPartition()); + } - } catch (HyracksDataException e) { + } catch (HyracksException e) { LOGGER.error("unexpected failure in partition reader clean up", e); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20292?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: I768eab7bd3ed1dbfda1bd3449264c92952b4cb53 Gerrit-Change-Number: 20292 Gerrit-PatchSet: 7 Gerrit-Owner: Ian Maxon <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Peeyush Gupta <[email protected]>
