>From Ian Maxon <[email protected]>:

Ian Maxon has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20292?usp=email )

Change subject: [ASTERIXDB-3644] Callback for result consumption
......................................................................

[ASTERIXDB-3644] Callback for result consumption

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

Add a callback for when a result partition is consumed.
Then, when all results are consumed, remove the job
from the result directory.

Ext-ref: MB-68455

Change-Id: I768eab7bd3ed1dbfda1bd3449264c92952b4cb53
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20292
Reviewed-by: Peeyush Gupta <[email protected]>
Tested-by: Ian Maxon <[email protected]>
Integration-Tests: Ian Maxon <[email protected]>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
17 files changed, 188 insertions(+), 9 deletions(-)

Approvals:
  Ian Maxon: Verified; Verified
  Peeyush Gupta: Looks good to me, approved

Objections:
  Anon. E. Moose #1000171: Violations found




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 3b0e031..09ea35e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -65,9 +65,10 @@
         }
         IResultSet resultSet = getResultSet();
         ResultReader resultReader = new ResultReader(resultSet, 
handle.getJobId(), handle.getResultSetId());
+        boolean printStarted = false;
+        ResponsePrinter printer = null;
         try {
             ResultJobRecord.Status status = resultReader.getStatus();
-
             final HttpResponseStatus httpStatus;
             if (status == null) {
                 httpStatus = HttpResponseStatus.NOT_FOUND;
@@ -79,6 +80,7 @@
                     case RUNNING:
                     case IDLE:
                     case FAILED:
+                    case REMOVED:
                         httpStatus = HttpResponseStatus.NOT_FOUND;
                         break;
                     default:
@@ -92,12 +94,13 @@
             }
             ResultMetadata metadata = (ResultMetadata) 
resultReader.getMetadata();
             SessionOutput sessionOutput = initResponse(request, response, 
metadata.getFormat());
-            ResponsePrinter printer = new ResponsePrinter(sessionOutput);
+            printer = new ResponsePrinter(sessionOutput);
             if (metadata.getFormat() == SessionConfig.OutputFormat.CLEAN_JSON
                     || metadata.getFormat() == 
SessionConfig.OutputFormat.LOSSLESS_JSON
                     || metadata.getFormat() == 
SessionConfig.OutputFormat.LOSSLESS_ADM_JSON) {
                 final Stats stats = new Stats();
                 printer.begin();
+                printStarted = true;
                 printer.addResultPrinter(new ResultsPrinter(appCtx, 
resultReader, null, stats, sessionOutput));
                 printer.printResults();
                 ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() 
- elapsedStart,
@@ -112,6 +115,7 @@
                 }
                 printer.printFooters();
                 printer.end();
+                printStarted = false;
             } else {
                 ResultUtil.printResults(appCtx, resultReader, sessionOutput, 
new Stats(), null);
             }
@@ -127,6 +131,10 @@
         } catch (Exception e) {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             LOGGER.log(Level.WARN, "Error retrieving result for \"" + 
strHandle + "\"", e);
+        } finally {
+            if (printStarted && printer != null) {
+                printer.end();
+            }
         }
         if (response.writer().checkError()) {
             LOGGER.warn("Error flushing output writer for \"" + strHandle + 
"\"");
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
index 9a5e17e..a6aef7c 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -59,7 +59,7 @@
         <compilation-unit name="async-exhausted-result">
             <output-dir 
compare="Clean-JSON">async-exhausted-result</output-dir>
             <parameter name="profile" value="timings" type="string"/>
-            <expected-error>Premature end of chunk</expected-error> 
<!--TODO:REVISIT -->
+            <expected-error>HTTP/1.1 404 Not Found</expected-error>
             <source-location>false</source-location>
         </compilation-unit>
     </test-case>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
index 4798494..184cb7c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultDirectory.java
@@ -57,4 +57,6 @@
      * @throws Exception
      */
     IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws 
Exception;
+
+    void releaseResult(JobId jobId, ResultSetId rsId);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
index 4909903..f107fb8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultPartitionManager.java
@@ -32,10 +32,12 @@

     void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, 
int partition) throws HyracksException;

+    void reportPartitionConsumed(JobId jobId, ResultSetId rsId, int partition) 
throws HyracksException;
+
     void initializeResultPartitionReader(JobId jobId, ResultSetId resultSetId, 
int partition, IFrameWriter noc)
             throws HyracksException;

-    void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+    void removePartition(JobId jobId, ResultSetId resultSetId, int partition) 
throws HyracksException;

     void abortReader(JobId jobId);

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
index 71792be..a442c11c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultDirectoryRecord.java
@@ -78,6 +78,10 @@
         updateStatus(Status.SUCCESS);
     }

+    public boolean ready() {
+        return status == Status.SUCCESS;
+    }
+
     public void fail() {
         status = Status.FAILED;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 02762ee..44de45f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -34,6 +34,7 @@
         IDLE,
         RUNNING,
         SUCCESS,
+        REMOVED,
         FAILED
     }

@@ -114,6 +115,14 @@
         updateState(State.SUCCESS);
     }

+    public void consume() {
+        updateState(State.REMOVED);
+    }
+
+    public boolean consumed() {
+        return status.getState() == State.REMOVED;
+    }
+
     public void fail(List<Exception> exceptions) {
         updateState(State.FAILED);
         status.setExceptions(exceptions);
@@ -166,13 +175,21 @@

     public synchronized void updateState() {
         int successCount = 0;
+        int consumedCount = 0;
         ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
         for (ResultDirectoryRecord record : records) {
-            if ((record != null) && (record.getStatus() == 
ResultDirectoryRecord.Status.SUCCESS)) {
-                successCount++;
+            if (record != null) {
+                if (record.ready()) {
+                    successCount++;
+                }
+                if (record.hasReachedReadEOS()) {
+                    consumedCount++;
+                }
             }
         }
-        if (successCount == records.length) {
+        if (consumedCount == records.length) {
+            consume();
+        } else if (successCount == records.length) {
             success();
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
index 5dd5fff..1344473 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -67,6 +67,11 @@
     }

     @Override
+    public void releaseResult(JobId jobId, ResultSetId rsId) {
+
+    }
+
+    @Override
     public void close() throws IOException {
         ipc.stop();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
index ed628f7..f5c4a3a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -60,4 +60,9 @@
                 new 
HyracksClientInterfaceFunctions.GetResultMetadataFunction(jobId, rsId);
         return (IResultMetadata) rpci.call(ipcHandle, grmf);
     }
+
+    @Override
+    public void releaseResult(JobId jobId, ResultSetId rsId) {
+
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index d350f61..6d32621 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
 import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
 import org.apache.hyracks.control.cc.work.ReportProfilesWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionConsumedWork;
 import 
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.TaskCompleteWork;
 import org.apache.hyracks.control.cc.work.TaskFailureWork;
@@ -127,6 +128,12 @@
                 ccs.getWorkQueue().schedule(new 
ReportResultPartitionWriteCompletionWork(ccs, rrpwc.getJobId(),
                         rrpwc.getResultSetId(), rrpwc.getPartition()));
                 break;
+            case REPORT_RESULT_PARTITION_CONSUMED:
+                CCNCFunctions.ReportResultPartitionConsumedFunction rrpc =
+                        (CCNCFunctions.ReportResultPartitionConsumedFunction) 
fn;
+                ccs.getWorkQueue().schedule(new 
ReportResultPartitionConsumedWork(ccs, rrpc.getJobId(),
+                        rrpc.getResultSetId(), rrpc.getPartition()));
+                break;
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction rsf = 
(CCNCFunctions.SendApplicationMessageFunction) fn;
                 ApplicationMessageWork work =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
index b1ecb45..a986f33 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/IResultDirectoryService.java
@@ -43,6 +43,8 @@
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition)
             throws HyracksDataException;

+    public void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, 
int partition) throws HyracksDataException;
+
     public void reportJobFailure(JobId jobId, List<Exception> exceptions);

     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws 
HyracksDataException;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index b6274d9..139775c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -155,6 +155,18 @@
     }

     @Override
+    public synchronized void reportResultPartitionConsumed(JobId jobId, 
ResultSetId rsId, int partition)
+            throws HyracksDataException {
+        ResultJobRecord djr = getNonNullResultJobRecord(jobId);
+        djr.getDirectoryRecord(partition).readEOS();
+        djr.updateState();
+        if (djr.consumed()) {
+            sweep(jobId);
+        }
+        notifyAll();
+    }
+
+    @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> 
exceptions) {
         ResultJobRecord rjr = getResultJobRecord(jobId);
         if (logFailure(rjr)) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java
new file mode 100644
index 0000000..5e61851
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionConsumedWork.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.work.AbstractWork;
+
+public class ReportResultPartitionConsumedWork extends AbstractWork {
+    private final ClusterControllerService ccs;
+
+    private final JobId jobId;
+
+    private final ResultSetId rsId;
+
+    private final int partition;
+
+    public ReportResultPartitionConsumedWork(ClusterControllerService ccs, 
JobId jobId, ResultSetId rsId,
+            int partition) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.rsId = rsId;
+        this.partition = partition;
+    }
+
+    @Override
+    public void run() {
+        try {
+            
ccs.getResultDirectoryService().reportResultPartitionConsumed(jobId, rsId, 
partition);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " 
Partition@" + partition;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 1c91183..d706c58 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -73,6 +73,8 @@

     void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, 
int partition) throws Exception;

+    void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, int 
partition) throws Exception;
+
     void getNodeControllerInfos() throws Exception;

     void notifyThreadDump(String nodeId, String requestId, String 
threadDumpJSON) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 1939f9e..ace6095 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -93,6 +93,7 @@
         REGISTER_PARTITION_REQUEST,
         REGISTER_RESULT_PARTITION_LOCATION,
         REPORT_RESULT_PARTITION_WRITE_COMPLETION,
+        REPORT_RESULT_PARTITION_CONSUMED,

         NODE_REGISTRATION_RESULT,
         START_TASKS,
@@ -686,6 +687,39 @@
         }
     }

+    public static class ReportResultPartitionConsumedFunction extends Function 
{
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final ResultSetId rsId;
+
+        private final int partition;
+
+        public ReportResultPartitionConsumedFunction(JobId jobId, ResultSetId 
rsId, int partition) {
+            this.jobId = jobId;
+            this.rsId = rsId;
+            this.partition = partition;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_RESULT_PARTITION_CONSUMED;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+
+        public int getPartition() {
+            return partition;
+        }
+    }
+
     public static class NodeRegistrationResult extends Function {
         private static final long serialVersionUID = 1L;

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 09dc04d..5313dad 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -44,6 +44,7 @@
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.RegisterResultPartitionLocationFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportDeployedJobSpecFailureFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportProfileFunction;
+import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionConsumedFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ReportResultPartitionWriteCompletionFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.SendApplicationMessageFunction;
 import 
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
@@ -155,6 +156,12 @@
     }

     @Override
+    public void reportResultPartitionConsumed(JobId jobId, ResultSetId rsId, 
int partition) throws Exception {
+        ReportResultPartitionConsumedFunction fn = new 
ReportResultPartitionConsumedFunction(jobId, rsId, partition);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void notifyDeployedJobSpecFailure(DeployedJobSpecId 
deployedJobSpecId, String nodeId) throws Exception {
         ReportDeployedJobSpecFailureFunction fn = new 
ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId);
         ipcHandle.send(-1, fn, null);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
index 165cb95..557b74b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionManager.java
@@ -110,6 +110,17 @@
     }

     @Override
+    public void reportPartitionConsumed(JobId jobId, ResultSetId rsId, int 
partition) throws HyracksException {
+        try {
+            LOGGER.trace("Reporting partition consumed: JobId: {}:ResultSetId: 
{}:partition: {}", jobId, rsId,
+                    partition);
+            
ncs.getClusterController(jobId.getCcId()).reportResultPartitionConsumed(jobId, 
rsId, partition);
+        } catch (Exception e) {
+            throw HyracksException.create(e);
+        }
+    }
+
+    @Override
     public void initializeResultPartitionReader(JobId jobId, ResultSetId 
resultSetId, int partition,
             IFrameWriter writer) throws HyracksException {
         ResultState resultState = getResultState(jobId, resultSetId, 
partition);
@@ -137,10 +148,12 @@
     }

     @Override
-    public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition) {
+    public synchronized void removePartition(JobId jobId, ResultSetId 
resultSetId, int partition)
+            throws HyracksException {
         ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, 
partition)) {
             partitionResultStateMap.remove(jobId);
+            reportPartitionConsumed(jobId, resultSetId, partition);
         }
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
index 3774530..43042b3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultPartitionReader.java
@@ -23,6 +23,7 @@

 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.partitions.ResultSetPartitionId;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
@@ -102,8 +103,9 @@
                     final ResultSetPartitionId partitionId = 
resultState.getResultSetPartitionId();
                     
resultPartitionManager.removePartition(partitionId.getJobId(), 
partitionId.getResultSetId(),
                             partitionId.getPartition());
+
                 }
-            } catch (HyracksDataException e) {
+            } catch (HyracksException e) {
                 LOGGER.error("unexpected failure in partition reader clean 
up", e);
             }
         }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20292?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: I768eab7bd3ed1dbfda1bd3449264c92952b4cb53
Gerrit-Change-Number: 20292
Gerrit-PatchSet: 7
Gerrit-Owner: Ian Maxon <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Ian Maxon <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Peeyush Gupta <[email protected]>

Reply via email to