>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271?usp=email )
Change subject: [ASTERIXDB-3649][API] Untrack non-ASYNC requests without result
......................................................................
[ASTERIXDB-3649][API] Untrack non-ASYNC requests without result
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
For a non-ASYNC request that has a statement not producing
a result, untrack the request when the statement execution
is done.
- when a request is archived, nullify the thread executing
the request.
Ext-ref: MB-71997
Change-Id: I48f88ef268447f1bced0006087b21dc8bb1ff4c1
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
4 files changed, 30 insertions(+), 7 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/71/21271/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 72c2ebc..6a533b3 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -44,7 +44,7 @@
StorageUtil.getIntSizeInBytes(64,
StorageUtil.StorageUnit.KILOBYTE);
protected final long creationTime = System.nanoTime();
protected final long creationSystemTime = System.currentTimeMillis();
- protected final Thread executor;
+ protected Thread executor;
protected final String statement;
protected final String clientContextId;
protected final JobState jobState;
@@ -66,6 +66,11 @@
return clientContextId;
}
+ @Override
+ public void archived() {
+ executor = null;
+ }
+
public void setPlan(String plan) {
if (plan != null) {
this.plan = plan.length() > MAX_STATEMENT_LENGTH ?
plan.substring(0, MAX_STATEMENT_LENGTH) : plan;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 86b71d4..14eda1e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -371,12 +371,13 @@
@Override
public void compileAndExecute(IHyracksClientConnection hcc,
IRequestParameters requestParameters) throws Exception {
validateStatements(requestParameters);
- trackRequest(requestParameters);
+ boolean trackInAsyncDeferredRequests =
shouldTrackAsSeparateRequest(requestParameters);
+ trackRequest(requestParameters, trackInAsyncDeferredRequests);
Counter resultSetIdCounter = new Counter(0);
FileSplit outputFile = null;
String threadName = Thread.currentThread().getName();
- Thread.currentThread().setName(
- QueryTranslator.class.getSimpleName() + ":" +
requestParameters.getRequestReference().getUuid());
+ String reqId = requestParameters.getRequestReference().getUuid();
+ Thread.currentThread().setName(QueryTranslator.class.getSimpleName() +
":" + reqId);
Map<String, String> config = new HashMap<>();
final IResultSet resultSet = requestParameters.getResultSet();
final ResultDelivery resultDelivery =
requestParameters.getResultProperties().getDelivery();
@@ -386,6 +387,7 @@
final ResultMetadata outMetadata = requestParameters.getOutMetadata();
final Map<String, IAObject> stmtParams =
requestParameters.getStatementParameters();
warningCollector.setMaxWarnings(sessionConfig.getMaxWarnings());
+ boolean canDeliverResult = false;
try {
for (Statement stmt : statements) {
if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
@@ -513,6 +515,7 @@
if (stats.getProfileType() == Stats.ProfileType.FULL) {
this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
}
+ canDeliverResult = true;
handleCopyToStatement(metadataProvider, stmt, hcc,
resultSet, resultDelivery, outMetadata,
requestParameters, stmtParams, stats);
break;
@@ -524,6 +527,7 @@
metadataProvider.setResultAsyncMode(resultDelivery
== ResultDelivery.ASYNC
|| resultDelivery ==
ResultDelivery.DEFERRED);
metadataProvider.setMaxResultReads(maxResultReads);
+ canDeliverResult = true;
}
if (stats.getProfileType() == Stats.ProfileType.FULL) {
this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
@@ -566,6 +570,7 @@
if (stats.getProfileType() == Stats.ProfileType.FULL) {
this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
}
+ canDeliverResult = true;
handleQuery(metadataProvider, (Query) stmt, hcc,
resultSet, resultDelivery, outMetadata, stats,
requestParameters, stmtParams, stmtRewriter);
break;
@@ -599,12 +604,17 @@
}
}
} catch (Exception ex) {
+ // need to handle job and request clean-ups in case of failures
this.appCtx.getRequestTracker().incrementFailedRequests();
throw ex;
} finally {
// async queries are completed after their job completes
if (statements.isEmpty() || ResultDelivery.ASYNC !=
resultDelivery) {
-
appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
+ // this assumes 1-1 mapping between a request and a statement,
needs to be adapted for multi-statement
+ appCtx.getRequestTracker().complete(reqId);
+ if (trackInAsyncDeferredRequests && ResultDelivery.ASYNC !=
resultDelivery && !canDeliverResult) {
+
appCtx.getRequestTracker().removeAsyncOrDeferredRequest(reqId);
+ }
}
Thread.currentThread().setName(threadName);
}
@@ -5991,10 +6001,11 @@
}
}
- protected void trackRequest(IRequestParameters requestParameters) throws
HyracksDataException {
+ protected void trackRequest(IRequestParameters requestParameters, boolean
trackInAsyncDeferredRequests)
+ throws HyracksDataException {
final IClientRequest clientRequest =
appCtx.getReceptionist().requestReceived(requestParameters);
this.appCtx.getRequestTracker().track(clientRequest);
- if (shouldTrackAsSeparateRequest(requestParameters)) {
+ if (trackInAsyncDeferredRequests) {
appCtx.getRequestTracker().trackAsyncOrDeferredRequest(clientRequest);
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 2d239c1..496b2a1 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -101,6 +101,12 @@
boolean cancel(ICcApplicationContext appCtx) throws HyracksDataException;
/**
+ * Called when the request is archived.
+ * The request is archived when it is completed or cancelled and removed
from the list of running requests.
+ */
+ void archived();
+
+ /**
* @return A json string representation of this request
*/
String toJson();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index f596e07..d16dd66 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -149,6 +149,7 @@
clientIdRequests.remove(completedRequest.getClientContextId());
}
archive(completedRequest);
+ completedRequest.archived();
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21271?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I48f88ef268447f1bced0006087b21dc8bb1ff4c1
Gerrit-Change-Number: 21271
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>