Till Westmann has submitted this change and it was merged. Change subject: [ASTERIXDB-2165] Avoid OOM in QueryServiceServlet ......................................................................
[ASTERIXDB-2165] Avoid OOM in QueryServiceServlet - user model changes: no - storage format changes: no - interface change: no Change-Id: I74f61941f2e75e10f2accd6b2e6be6c1c0cd1490 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2150 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java 4 files changed, 50 insertions(+), 22 deletions(-) Approvals: Anon. E. Moose #1000171: No violations found Jenkins: Verified; No violations found; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java index b559df8..f7031a4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java @@ -20,6 +20,7 @@ package org.apache.asterix.translator; import java.io.PrintWriter; +import java.io.StringWriter; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; @@ -29,6 +30,8 @@ // Output path for primary execution. private final PrintWriter out; + private StringWriter buffer; + private PrintWriter bufferedOut; private final SessionOutput.ResultDecorator preResultDecorator; private final SessionOutput.ResultDecorator postResultDecorator; @@ -53,7 +56,31 @@ * Retrieve the PrintWriter to produce output to. */ public PrintWriter out() { - return this.out; + return this.bufferedOut != null ? this.bufferedOut : this.out; + } + + /** + * buffer the data provided to the PrintWriter returned by out() to be able to set the status of the response + * message when it can be determined. This is a no-op, if data is already buffered. + */ + public void hold() { + if (this.bufferedOut == null) { + this.buffer = new StringWriter(); + this.bufferedOut = new PrintWriter(this.buffer); + } + } + + /** + * release the data that was buffered by calling hold() and remove the buffer from the pipeline. + * This is a no-op, if data is not buffered. + */ + public void release() { + if (this.bufferedOut != null) { + this.bufferedOut.flush(); + this.out.write(buffer.toString()); + this.bufferedOut = null; + this.buffer = null; + } } public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 616c22e..64ea73d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -116,6 +116,8 @@ throw new Exception(err.toString(), err); } } + // no errors - stop buffering and allow for streaming result delivery + sessionOutput.release(); IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata(); if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index f8f5c18..42c9edd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -381,25 +381,25 @@ RequestParameters param = getRequestParameters(request); LOGGER.info(param.toString()); long elapsedStart = System.nanoTime(); - final StringWriter stringWriter = new StringWriter(); - final PrintWriter resultWriter = new PrintWriter(stringWriter); + final PrintWriter httpWriter = response.writer(); ResultDelivery delivery = parseResultDelivery(param.mode); String handleUrl = getHandleUrl(param.host, param.path, delivery); - SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter); + SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter); SessionConfig sessionConfig = sessionOutput.config(); HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); - HttpResponseStatus status = HttpResponseStatus.OK; Stats stats = new Stats(); long[] execStartEnd = new long[] { -1, -1 }; - resultWriter.print("{\n"); - printRequestId(resultWriter); - printClientContextID(resultWriter, param); - printSignature(resultWriter); - printType(resultWriter, sessionConfig); + // buffer the output until we are ready to set the status of the response message correctly + sessionOutput.hold(); + sessionOutput.out().print("{\n"); + printRequestId(sessionOutput.out()); + printClientContextID(sessionOutput.out(), param); + printSignature(sessionOutput.out()); + printType(sessionOutput.out(), sessionConfig); long errorCount = 1; // so far we just return 1 error try { if (param.statement == null || param.statement.isEmpty()) { @@ -410,33 +410,30 @@ if (optionalParamProvider != null) { optionalParams = optionalParamProvider.apply(request); } + response.setStatus(HttpResponseStatus.OK); executeStatement(statementsText, sessionOutput, delivery, stats, param, execStartEnd, optionalParams); if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) { ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS); } errorCount = 0; } catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) { - status = handleExecuteStatementException(e); - ResultUtil.printError(resultWriter, e); + response.setStatus(handleExecuteStatementException(e)); + ResultUtil.printError(sessionOutput.out(), e); ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); } finally { + // make sure that we stop buffering and return the result to the http response + sessionOutput.release(); if (execStartEnd[0] == -1) { execStartEnd[1] = -1; } else if (execStartEnd[1] == -1) { execStartEnd[1] = System.nanoTime(); } } - printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0], + printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0], stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount); - resultWriter.print("}\n"); - resultWriter.flush(); - String result = stringWriter.toString(); - - GlobalConfig.ASTERIX_LOGGER.log(Level.FINE, result); - - response.setStatus(status); - response.writer().print(result); - if (response.writer().checkError()) { + sessionOutput.out().print("}\n"); + sessionOutput.out().flush(); + if (sessionOutput.out().checkError()) { LOGGER.warning("Error flushing output writer"); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index a52d765..05debaa 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -2395,6 +2395,8 @@ createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(hdc, id, resultSetId); updateJobStats(id, stats); + // stop buffering and allow for streaming result delivery + sessionOutput.release(); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); }, clientContextId, ctx); -- To view, visit https://asterix-gerrit.ics.uci.edu/2150 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I74f61941f2e75e10f2accd6b2e6be6c1c0cd1490 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
