Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2292

Change subject: [ASTERIXDB-2249][API] Add Result Max Reads to API
......................................................................

[ASTERIXDB-2249][API] Add Result Max Reads to API

- user model changes: no
- storage format changes: no
- interface changes: yes
  - IRequestParameters: add ResultProperties
  - IDatasetPartitionManager: add maxReads

Details:
- Add option to specify max result reads and default
  it to 1.
- Fix exception handling in DatasetPartitionReader.
- Add option to specify maxResultReads in tests.
- Use new option in async-repeated test.

Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
A 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
29 files changed, 194 insertions(+), 80 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/92/2292/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 8d0f20b..6d8ba92 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -33,7 +33,7 @@
     /**
      * @return The {@code ResultDelivery} kind required for queries in the 
list of statements
      */
-    IStatementExecutor.ResultDelivery getResultDelivery();
+    ResultProperties getResultProperties();
 
     /**
      * @return a reference to write the stats of executed queries
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
new file mode 100644
index 0000000..4866c6d
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.io.Serializable;
+
+public class ResultProperties implements Serializable {
+
+    public static final long DEFAULT_MAX_READS = 1;
+    private final IStatementExecutor.ResultDelivery delivery;
+    private final long maxReads;
+
+    public ResultProperties(IStatementExecutor.ResultDelivery delivery) {
+        this(delivery, DEFAULT_MAX_READS);
+    }
+
+    public ResultProperties(IStatementExecutor.ResultDelivery delivery, long 
maxReads) {
+        this.delivery = delivery;
+        this.maxReads = maxReads;
+    }
+
+    public IStatementExecutor.ResultDelivery getDelivery() {
+        return delivery;
+    }
+
+    public long getMaxReads() {
+        return maxReads;
+    }
+
+    public ResultProperties getNcToCcResultProperties() {
+        if (delivery != IStatementExecutor.ResultDelivery.IMMEDIATE) {
+            return this;
+        }
+        // switch IMMEDIATE to DEFERRED since the result will be severed by 
the NC
+        return new 
ResultProperties(IStatementExecutor.ResultDelivery.DEFERRED, maxReads);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 87c1c57..df2a2a1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -48,6 +48,7 @@
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
 import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -164,7 +165,7 @@
             double duration;
             long startTime = System.currentTimeMillis();
             final IRequestParameters requestParameters =
-                    new RequestParameters(hds, 
IStatementExecutor.ResultDelivery.IMMEDIATE,
+                    new RequestParameters(hds, new 
ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
                             new IStatementExecutor.Stats(), null, null, null);
             translator.compileAndExecute(hcc, null, requestParameters);
             long endTime = System.currentTimeMillis();
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 83a40f0..a137bdc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -26,6 +26,8 @@
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
+import javax.xml.transform.Result;
+
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.message.CancelQueryRequest;
 import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
@@ -41,6 +43,7 @@
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -66,14 +69,12 @@
 
     @Override
     protected void executeStatement(String statementsText, SessionOutput 
sessionOutput,
-            IStatementExecutor.ResultDelivery delivery, 
IStatementExecutor.Stats stats, RequestParameters param,
+            ResultProperties resultProperties, IStatementExecutor.Stats stats, 
RequestParameters param,
             RequestExecutionState execution, Map<String, String> 
optionalParameters) throws Exception {
         // Running on NC -> send 'execute' message to CC
         INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
         INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
-        IStatementExecutor.ResultDelivery ccDelivery =
-                delivery == IStatementExecutor.ResultDelivery.IMMEDIATE ? 
IStatementExecutor.ResultDelivery.DEFERRED
-                        : delivery;
+        final IStatementExecutor.ResultDelivery delivery = 
resultProperties.getDelivery();
         ExecuteStatementResponseMessage responseMsg;
         MessageFuture responseFuture = ncMb.registerMessageFuture();
         final String handleUrl = getHandleUrl(param.host, param.path, 
delivery);
@@ -86,8 +87,8 @@
                 timeout = 
TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
             }
             ExecuteStatementRequestMessage requestMsg = new 
ExecuteStatementRequestMessage(ncCtx.getNodeId(),
-                    responseFuture.getFutureId(), queryLanguage, 
statementsText, sessionOutput.config(), ccDelivery,
-                    param.clientContextID, handleUrl, optionalParameters);
+                    responseFuture.getFutureId(), queryLanguage, 
statementsText, sessionOutput.config(),
+                    resultProperties.getNcToCcResultProperties(), 
param.clientContextID, handleUrl, optionalParameters);
             execution.start();
             ncMb.sendMessageToCC(requestMsg);
             try {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index a8f14b5..3a68e83 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +47,7 @@
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -137,7 +137,8 @@
         PRETTY("pretty"),
         MODE("mode"),
         TIMEOUT("timeout"),
-        PLAN_FORMAT("plan-format");
+        PLAN_FORMAT("plan-format"),
+        MAX_RESULT_READS("max_result_reads");
 
         private final String str;
 
@@ -193,6 +194,7 @@
         boolean pretty;
         String clientContextID;
         String mode;
+        String maxResultReads;
 
         @Override
         public String toString() {
@@ -207,6 +209,7 @@
                 on.put("clientContextID", clientContextID);
                 on.put("format", format);
                 on.put("timeout", timeout);
+                on.put("maxResultReads", maxResultReads);
                 return om.writer(new 
MinimalPrettyPrinter()).writeValueAsString(on);
             } catch (JsonProcessingException e) { // NOSONAR
                 return e.getMessage();
@@ -383,6 +386,7 @@
                 param.mode = toLower(getOptText(jsonRequest, 
Parameter.MODE.str()));
                 param.clientContextID = getOptText(jsonRequest, 
Parameter.CLIENT_ID.str());
                 param.timeout = getOptText(jsonRequest, 
Parameter.TIMEOUT.str());
+                param.maxResultReads = getOptText(jsonRequest, 
Parameter.MAX_RESULT_READS.str());
             } catch (JsonParseException | JsonMappingException e) {
                 // if the JSON parsing fails, the statement is empty and we 
get an empty statement error
                 GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), 
e);
@@ -397,6 +401,7 @@
             param.mode = toLower(request.getParameter(Parameter.MODE.str()));
             param.clientContextID = 
request.getParameter(Parameter.CLIENT_ID.str());
             param.timeout = request.getParameter(Parameter.TIMEOUT.str());
+            param.maxResultReads = 
request.getParameter(Parameter.MAX_RESULT_READS.str());
         }
         return param;
     }
@@ -448,6 +453,10 @@
 
         ResultDelivery delivery = parseResultDelivery(param.mode);
 
+        final ResultProperties resultProperties = param.maxResultReads == null 
?
+                new ResultProperties(delivery) :
+                new ResultProperties(delivery, 
Long.parseLong(param.maxResultReads));
+
         String handleUrl = getHandleUrl(param.host, param.path, delivery);
         SessionOutput sessionOutput = createSessionOutput(param, handleUrl, 
httpWriter);
         SessionConfig sessionConfig = sessionOutput.config();
@@ -478,7 +487,7 @@
                     "http://"; + hostName + ":" + 
appCtx.getExternalProperties().getQueryWebInterfacePort());
             response.setHeader("Access-Control-Allow-Headers", "Origin, 
X-Requested-With, Content-Type, Accept");
             response.setStatus(execution.getHttpStatus());
-            executeStatement(statementsText, sessionOutput, delivery, stats, 
param, execution, optionalParams);
+            executeStatement(statementsText, sessionOutput, resultProperties, 
stats, param, execution, optionalParams);
             if (ResultDelivery.IMMEDIATE == delivery || 
ResultDelivery.DEFERRED == delivery) {
                 ResultUtil.printStatus(sessionOutput, 
execution.getResultStatus());
             }
@@ -502,9 +511,9 @@
         }
     }
 
-    protected void executeStatement(String statementsText, SessionOutput 
sessionOutput, ResultDelivery delivery,
-            IStatementExecutor.Stats stats, RequestParameters param, 
RequestExecutionState execution,
-            Map<String, String> optionalParameters) throws Exception {
+    protected void executeStatement(String statementsText, SessionOutput 
sessionOutput,
+            ResultProperties resultProperties, IStatementExecutor.Stats stats, 
RequestParameters param,
+            RequestExecutionState execution, Map<String, String> 
optionalParameters) throws Exception {
         IClusterManagementWork.ClusterState clusterState =
                 ((ICcApplicationContext) 
appCtx).getClusterStateManager().getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -518,8 +527,8 @@
                 sessionOutput, compilationProvider, componentProvider);
         execution.start();
         final IRequestParameters requestParameters =
-                new 
org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), 
delivery, stats, null,
-                        param.clientContextID, optionalParameters);
+                new 
org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), 
resultProperties, stats,
+                        null, param.clientContextID, optionalParameters);
         translator.compileAndExecute(getHyracksClientConnection(), queryCtx, 
requestParameters);
         execution.end();
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 3359b9f..360c522 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
 import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -209,7 +210,8 @@
             IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
             final IRequestParameters requestParameters =
-                    new RequestParameters(hds, resultDelivery, new 
IStatementExecutor.Stats(), null, null, null);
+                    new RequestParameters(hds, new 
ResultProperties(resultDelivery), new IStatementExecutor.Stats(),
+                            null, null, null);
             translator.compileAndExecute(hcc, null, requestParameters);
         } catch (AsterixException | TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index 58a7f09..4ecd978 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
 import org.apache.asterix.translator.SessionConfig.PlanFormat;
@@ -120,8 +121,8 @@
         IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
         final IRequestParameters requestParameters =
-                new RequestParameters(null, 
IStatementExecutor.ResultDelivery.IMMEDIATE, new IStatementExecutor.Stats(),
-                        null, null, null);
+                new RequestParameters(null, new 
ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
+                        new IStatementExecutor.Stats(), null, null, null);
         translator.compileAndExecute(hcc, null, requestParameters);
         writer.flush();
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 0b8c34c..5b0eb97 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -46,6 +46,7 @@
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -69,20 +70,20 @@
     private final ILangExtension.Language lang;
     private final String statementsText;
     private final SessionConfig sessionConfig;
-    private final IStatementExecutor.ResultDelivery delivery;
+    private final ResultProperties resultProperties;
     private final String clientContextID;
     private final String handleUrl;
     private final Map<String, String> optionalParameters;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long 
requestMessageId, ILangExtension.Language lang,
-            String statementsText, SessionConfig sessionConfig, 
IStatementExecutor.ResultDelivery delivery,
+            String statementsText, SessionConfig sessionConfig, 
ResultProperties resultProperties,
             String clientContextID, String handleUrl, Map<String, String> 
optionalParameters) {
         this.requestNodeId = requestNodeId;
         this.requestMessageId = requestMessageId;
         this.lang = lang;
         this.statementsText = statementsText;
         this.sessionConfig = sessionConfig;
-        this.delivery = delivery;
+        this.resultProperties = resultProperties;
         this.clientContextID = clientContextID;
         this.handleUrl = handleUrl;
         this.optionalParameters = optionalParameters;
@@ -122,7 +123,8 @@
                     compilationProvider, storageComponentProvider);
             final IStatementExecutor.Stats stats = new 
IStatementExecutor.Stats();
             final IRequestParameters requestParameters =
-                    new RequestParameters(null, delivery, stats, outMetadata, 
clientContextID, optionalParameters);
+                    new RequestParameters(null, resultProperties, stats, 
outMetadata, clientContextID,
+                            optionalParameters);
             translator.compileAndExecute(ccApp.getHcc(), 
statementExecutorContext, requestParameters);
             outPrinter.close();
             responseMsg.setResult(outWriter.toString());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 9b96883..4e9cb47 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -283,7 +283,8 @@
         Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
         Map<String, String> config = new HashMap<>();
         final IHyracksDataset hdc = requestParameters.getHyracksDataset();
-        final ResultDelivery resultDelivery = 
requestParameters.getResultDelivery();
+        final ResultDelivery resultDelivery = 
requestParameters.getResultProperties().getDelivery();
+        final long maxResultReads = 
requestParameters.getResultProperties().getMaxReads();
         final Stats stats = requestParameters.getStats();
         final ResultMetadata outMetadata = requestParameters.getOutMetadata();
         final String clientContextId = requestParameters.getClientContextId();
@@ -351,6 +352,7 @@
                             metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
                             metadataProvider.setResultAsyncMode(resultDelivery 
== ResultDelivery.ASYNC
                                     || resultDelivery == 
ResultDelivery.DEFERRED);
+                            metadataProvider.setMaxResultReads(maxResultReads);
                         }
                         handleInsertUpsertStatement(metadataProvider, stmt, 
hcc, hdc, resultDelivery, outMetadata,
                                 stats, false, clientContextId);
@@ -386,6 +388,7 @@
                         metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.DEFERRED);
+                        metadataProvider.setMaxResultReads(maxResultReads);
                         handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, outMetadata, stats,
                                 clientContextId, ctx);
                         break;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index 5b8da8b..9592492 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -22,24 +22,24 @@
 
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.ResultProperties;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 
 public class RequestParameters implements IRequestParameters {
 
     private final IHyracksDataset hdc;
-    private final ResultDelivery resultDelivery;
+    private final ResultProperties resultProperties;
     private final Stats stats;
     private final Map<String, String> optionalParameters;
     private final IStatementExecutor.ResultMetadata outMetadata;
     private final String clientContextId;
 
-    public RequestParameters(IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
+    public RequestParameters(IHyracksDataset hdc, ResultProperties 
resultProperties, Stats stats,
             IStatementExecutor.ResultMetadata outMetadata, String 
clientContextId,
             Map<String, String> optionalParameters) {
         this.hdc = hdc;
-        this.resultDelivery = resultDelivery;
+        this.resultProperties = resultProperties;
         this.stats = stats;
         this.outMetadata = outMetadata;
         this.clientContextId = clientContextId;
@@ -52,8 +52,8 @@
     }
 
     @Override
-    public IStatementExecutor.ResultDelivery getResultDelivery() {
-        return resultDelivery;
+    public ResultProperties getResultProperties() {
+        return resultProperties;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 30336d1..6db7ff7 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -122,7 +122,7 @@
     private static final Pattern HTTP_PARAM_PATTERN = Pattern.compile("param 
(\\w+)=(.*)", Pattern.MULTILINE);
     private static final Pattern HTTP_BODY_PATTERN = 
Pattern.compile("body=(.*)", Pattern.MULTILINE);
     private static final Pattern HTTP_STATUSCODE_PATTERN = 
Pattern.compile("statuscode (.*)", Pattern.MULTILINE);
-
+    private static final Pattern MAX_RESULT_READS_PATTERN = 
Pattern.compile("maxresultreads=(\\d+)(\\D|$)", Pattern.MULTILINE);
     public static final int TRUNCATE_THRESHOLD = 16384;
 
     public static final String DELIVERY_ASYNC = "async";
@@ -556,7 +556,11 @@
 
     public InputStream executeQueryService(String str, OutputFormat fmt, URI 
uri, List<Parameter> params,
             boolean jsonEncoded, Predicate<Integer> responseCodeValidator, 
boolean cancellable) throws Exception {
-        final List<Parameter> newParams = upsertParam(params, "format", 
fmt.mimeType());
+        List<Parameter> newParams = upsertParam(params, "format", 
fmt.mimeType());
+        final Optional<String> maxReadsOptional = extractMaxResultReads(str);
+        if (maxReadsOptional.isPresent()) {
+            newParams = upsertParam(newParams, "max_result_reads", 
maxReadsOptional.get());
+        }
         HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, 
uri, "statement", newParams)
                 : constructPostMethodUrl(str, uri, "statement", newParams);
         // Set accepted output response type
@@ -1392,6 +1396,14 @@
         return tmpStmt;
     }
 
+    protected static Optional<String> extractMaxResultReads(String statement) {
+        final Matcher m = MAX_RESULT_READS_PATTERN.matcher(statement);
+        while (m.find()) {
+            return Optional.of(m.group(1));
+        }
+        return Optional.empty();
+    }
+
     protected static Optional<String> extractBody(String statement) {
         final Matcher m = HTTP_BODY_PATTERN.matcher(statement);
         while (m.find()) {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
index 1e18f66..8055915 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+-- maxresultreads=2
 -- handlevariable=status
 
 select i, i * i as i2 from range(1, 10) i;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b8790e5..6f58b0a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -153,6 +153,7 @@
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
     private boolean asyncResults;
+    private long maxResultReads;
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     private TxnId txnId;
@@ -236,6 +237,14 @@
 
     public void setResultAsyncMode(boolean asyncResults) {
         this.asyncResults = asyncResults;
+    }
+
+    public void setMaxResultReads(long maxResultReads) {
+        this.maxResultReads = maxResultReads;
+    }
+
+    public long getMaxResultReads() {
+        return maxResultReads;
     }
 
     public ResultSetId getResultSetId() {
@@ -536,7 +545,7 @@
             IResultSerializerFactory resultSerializedAppenderFactory = 
resultSerializerFactoryProvider
                     .getAqlResultSerializerFactoryProvider(printColumns, 
printerFactories, getWriterFactory());
             resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, 
ordered, getResultAsyncMode(),
-                    resultSerializedAppenderFactory);
+                    resultSerializedAppenderFactory, getMaxResultReads());
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index 008f0be..e6cf6d3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -25,7 +25,7 @@
 
 public interface IDatasetPartitionManager extends IDatasetManager {
     IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
-            boolean asyncMode, int partition, int nPartitions) throws 
HyracksException;
+            boolean asyncMode, int partition, int nPartitions, long maxReads) 
throws HyracksException;
 
     void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int 
partition, int nPartitions,
             boolean orderedResult, boolean emptyResult) throws 
HyracksException;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index bc980e1..d381a67 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -70,12 +70,12 @@
 
     @Override
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
-            boolean asyncMode, int partition, int nPartitions) throws 
HyracksException {
+            boolean asyncMode, int partition, int nPartitions, long maxReads) {
         DatasetPartitionWriter dpw;
         JobId jobId = ctx.getJobletContext().getJobId();
         synchronized (this) {
             dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, 
asyncMode, orderedResult, partition, nPartitions,
-                    datasetMemoryManager, fileFactory);
+                    datasetMemoryManager, fileFactory, maxReads);
 
             ResultSetMap rsIdMap = 
partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
index ec33b05..26c19e8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -31,11 +31,8 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final DatasetPartitionManager datasetPartitionManager;
-
     private final DatasetMemoryManager datasetMemoryManager;
-
     private final Executor executor;
-
     private final ResultState resultState;
 
     public DatasetPartitionReader(DatasetPartitionManager 
datasetPartitionManager,
@@ -64,26 +61,32 @@
                             if (size <= 0) {
                                 break;
                             } else if (size < buffer.limit()) {
-                                throw new HyracksDataException("Premature end 
of file - readSize: " + size
-                                        + " buffer limit: " + buffer.limit());
+                                throw new IllegalStateException(
+                                        "Premature end of file - readSize: " + 
size + " buffer limit: " + buffer
+                                                .limit());
                             }
                             offset += size;
                             buffer.flip();
                             channel.nextFrame(buffer);
                         }
-                        LOGGER.info("Result Reader read + " + offset + " 
bytes");
+                        if (LOGGER.isInfoEnabled()) {
+                            LOGGER.info("Result Reader read + " + offset + " 
bytes");
+                        }
+                    } catch (Exception e) {
+                        LOGGER.error("partition reader failed", e);
+                        channel.abort();
                     } finally {
                         channel.close();
                         resultState.readClose();
-                        // If the query is a synchronous query, remove its 
partition as soon as it is read.
-                        if (!resultState.getAsyncMode()) {
+                        // if resultState has been exhausted, delete the 
result partition
+                        if (resultState.isExhausted()) {
                             
datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(),
-                                    
resultState.getResultSetPartitionId().getResultSetId(), resultState
-                                            
.getResultSetPartitionId().getPartition());
+                                    
resultState.getResultSetPartitionId().getResultSetId(),
+                                    
resultState.getResultSetPartitionId().getPartition());
                         }
                     }
                 } catch (HyracksDataException e) {
-                    throw new RuntimeException(e);
+                    LOGGER.error("unexpected failure in partition reader", e);
                 }
                 if (LOGGER.isInfoEnabled()) {
                     LOGGER.info("result reading successful(" + 
resultState.getResultSetPartitionId() + ")");
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 2bf5326..d49a1a6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -59,7 +59,7 @@
 
     public DatasetPartitionWriter(IHyracksTaskContext ctx, 
IDatasetPartitionManager manager, JobId jobId,
             ResultSetId rsId, boolean asyncMode, boolean orderedResult, int 
partition, int nPartitions,
-            DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory 
fileFactory) {
+            DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory 
fileFactory, long maxReads) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
@@ -70,7 +70,7 @@
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, 
partition);
         resultState = new ResultState(resultSetPartitionId, asyncMode, 
ctx.getIoManager(), fileFactory,
-                ctx.getInitialFrameSize());
+                ctx.getInitialFrameSize(), maxReads);
     }
 
     public ResultState getResultState() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index 43b1e9b..3e3f06b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -68,17 +68,22 @@
     private long size;
 
     private long persistentSize;
+    private long remainingReads;
 
     ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, 
IIOManager ioManager,
-            IWorkspaceFileFactory fileFactory, int frameSize) {
+            IWorkspaceFileFactory fileFactory, int frameSize, long maxReads) {
+        if (maxReads <= 0) {
+            throw new IllegalArgumentException("maxReads must be > 0");
+        }
         this.resultSetPartitionId = resultSetPartitionId;
         this.asyncMode = asyncMode;
         this.ioManager = ioManager;
         this.fileFactory = fileFactory;
         this.frameSize = frameSize;
+        remainingReads = maxReads;
         eos = new AtomicBoolean(false);
         failed = new AtomicBoolean(false);
-        localPageList = new ArrayList<Page>();
+        localPageList = new ArrayList<>();
 
         fileRef = null;
         writeFileHandle = null;
@@ -102,6 +107,7 @@
         closeWriteFileHandle();
         if (fileRef != null) {
             fileRef.delete();
+            fileRef = null;
         }
     }
 
@@ -152,7 +158,10 @@
     }
 
     public synchronized void readOpen() {
-        // It is a noOp for now, leaving here to keep the API stable for 
future usage.
+        if (isExhausted()) {
+            throw new IllegalStateException("Result reads exhausted");
+        }
+        remainingReads--;
     }
 
     public synchronized void readClose() throws HyracksDataException {
@@ -339,6 +348,7 @@
             ObjectNode on = om.createObjectNode();
             on.put("rspid", resultSetPartitionId.toString());
             on.put("async", asyncMode);
+            on.put("remainingReads", remainingReads);
             on.put("eos", eos.get());
             on.put("failed", failed.get());
             on.put("fileRef", String.valueOf(fileRef));
@@ -347,4 +357,8 @@
             return e.getMessage();
         }
     }
+
+    public synchronized boolean isExhausted() {
+        return remainingReads == 0;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 58eee79..d081bdb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -51,14 +51,16 @@
     private final boolean asyncMode;
 
     private final IResultSerializerFactory resultSerializerFactory;
+    private final long maxReads;
 
     public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, 
ResultSetId rsId, boolean ordered,
-            boolean asyncMode, IResultSerializerFactory 
resultSerializerFactory) throws IOException {
+            boolean asyncMode, IResultSerializerFactory 
resultSerializerFactory, long maxReads) throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
         this.ordered = ordered;
         this.asyncMode = asyncMode;
         this.resultSerializerFactory = resultSerializerFactory;
+        this.maxReads = maxReads;
     }
 
     @Override
@@ -87,7 +89,7 @@
             public void open() throws HyracksDataException {
                 try {
                     datasetPartitionWriter = 
dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
-                            nPartitions);
+                            nPartitions, maxReads);
                     datasetPartitionWriter.open();
                     resultSerializer.init();
                 } catch (HyracksException e) {
@@ -139,7 +141,8 @@
                 sb.append("{ ");
                 sb.append("\"rsId\": \"").append(rsId).append("\", ");
                 sb.append("\"ordered\": ").append(ordered).append(", ");
-                sb.append("\"asyncMode\": ").append(asyncMode).append(" }");
+                sb.append("\"asyncMode\": ").append(asyncMode).append(", ");
+                sb.append("\"maxReads\": ").append(maxReads).append(" }");
                 return sb.toString();
             }
         };
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index f169054..080746c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -100,7 +100,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         AbstractSingleActivityOperatorDescriptor printer = new 
ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
 
         return printer;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index b5c6238..c05b504 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -101,7 +101,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC2_ID);
 
@@ -173,7 +173,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
@@ -246,7 +246,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 56bf853..b693b09 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -159,7 +159,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, 
sorter, 0);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 67642f4..67845c0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -229,7 +229,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         AbstractSingleActivityOperatorDescriptor printer = new 
ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
 
         return printer;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index b62c011..d7d4219 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -95,7 +95,7 @@
             spec.addResultSetId(rsId);
 
             outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, 
false,
-                    
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                    
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
             PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
outputOp[i], locations);
         }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
index dc91dd2..06d7b04 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ScanPrintTest.java
@@ -71,7 +71,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC2_ID, NC1_ID);
@@ -107,7 +107,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC2_ID);
@@ -146,7 +146,7 @@
 
         ResultSetId rsId = new ResultSetId(1);
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         spec.addResultSetId(rsId);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC2_ID);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
index 3043cba..df9c0d7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SortMergeTest.java
@@ -85,7 +85,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, true, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, 
sorter, 0);
@@ -135,7 +135,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, 
sorter, 0);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 7075fe9..2c055c2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -132,7 +132,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
@@ -215,7 +215,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
@@ -300,7 +300,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
@@ -385,7 +385,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
@@ -471,7 +471,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNPartitioningConnectorDescriptor(spec,
@@ -563,7 +563,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNPartitioningConnectorDescriptor(spec,
@@ -654,7 +654,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNPartitioningConnectorDescriptor(spec,
@@ -750,7 +750,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordPartConn = new 
MToNPartitioningConnectorDescriptor(spec,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 03cd5d4..dc5d0bc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -182,7 +182,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
@@ -263,7 +263,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
OneToOneConnectorDescriptor(spec);
@@ -344,7 +344,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
OneToOneConnectorDescriptor(spec);
@@ -430,7 +430,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
 
         IConnectorDescriptor ordJoinConn = new 
OneToOneConnectorDescriptor(spec);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
index 542f037..e4d6398 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/UnionTest.java
@@ -79,7 +79,7 @@
         spec.addResultSetId(rsId);
 
         IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, 
rsId, false, false,
-                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+                
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC2_ID, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, 
unionAll, 0);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2292
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I86f75c791f034142c5b046445870bd91378c5b3a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to