Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3181
Change subject: [NO ISSUE][OTH] Add API To Ensure Request Requirements
......................................................................
[NO ISSUE][OTH] Add API To Ensure Request Requirements
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add API to ensure a client's request requirement is met
before running its job.
Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
3 files changed, 30 insertions(+), 8 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/81/3181/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index c174c02..f3afaaa 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -26,7 +26,8 @@
import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.api.RequestReference;
import org.apache.http.HttpHeaders;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.util.NetworkUtil;
@@ -49,7 +50,13 @@
@Override
public IClientRequest requestReceived(IRequestReference requestRef, String
clientContextId, String statement,
- Map<String, String> optionalParameters) throws
HyracksDataException {
+ Map<String, String> optionalParameters) {
return new ClientRequest(requestRef, clientContextId, statement,
optionalParameters);
}
+
+ @Override
+ public void ensureRequirements(IClientRequest clientRequest,
IMetadataProvider metadataProvider,
+ JobSpecification jobSpec) {
+ // currently we don't have any restrictions
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index df285c6..e2d4005 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2498,7 +2498,7 @@
case ASYNC:
MutableBoolean printed = new MutableBoolean(false);
executorService.submit(() -> asyncCreateAndRunJob(hcc,
compiler, locker, resultDelivery,
- requestParameters, cancellable, resultSetId, printed));
+ requestParameters, cancellable, resultSetId, printed,
metadataProvider));
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
@@ -2513,7 +2513,7 @@
sessionOutput.release();
ResultUtil.printResults(appCtx, resultReader,
sessionOutput, stats,
metadataProvider.findOutputRecordType());
- }, requestParameters, cancellable, appCtx);
+ }, requestParameters, cancellable, appCtx, metadataProvider);
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
@@ -2523,7 +2523,7 @@
outMetadata.getResultSets()
.add(Triple.of(id, resultSetId,
metadataProvider.findOutputRecordType()));
}
- }, requestParameters, cancellable, appCtx);
+ }, requestParameters, cancellable, appCtx, metadataProvider);
break;
default:
break;
@@ -2551,7 +2551,7 @@
private void asyncCreateAndRunJob(IHyracksClientConnection hcc,
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IRequestParameters
requestParameters, boolean cancellable,
- ResultSetId resultSetId, MutableBoolean printed) {
+ ResultSetId resultSetId, MutableBoolean printed, MetadataProvider
metadataProvider) {
Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
try {
createAndRunJob(hcc, jobFlags, jobId, compiler, locker,
resultDelivery, id -> {
@@ -2562,7 +2562,7 @@
printed.setTrue();
printed.notify();
}
- }, requestParameters, cancellable, appCtx);
+ }, requestParameters, cancellable, appCtx, metadataProvider);
} catch (Exception e) {
if (Objects.equals(JobId.INVALID, jobId.getValue())) {
// compilation failed
@@ -2593,7 +2593,8 @@
private static void createAndRunJob(IHyracksClientConnection hcc,
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, IResultPrinter printer,
- IRequestParameters requestParameters, boolean cancellable,
ICcApplicationContext appCtx) throws Exception {
+ IRequestParameters requestParameters, boolean cancellable,
ICcApplicationContext appCtx,
+ MetadataProvider metadataProvider) throws Exception {
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest =
(ClientRequest)
requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -2603,6 +2604,7 @@
if (jobSpec == null) {
return;
}
+ appCtx.getReceptionist().ensureRequirements(clientRequest,
metadataProvider, jobSpec);
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
clientRequest.setJobId(jobId);
if (cancellable) {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 51df306..2b776fc 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -20,7 +20,9 @@
import java.util.Map;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.http.api.IServletRequest;
public interface IReceptionist {
@@ -45,4 +47,15 @@
*/
IClientRequest requestReceived(IRequestReference requestRef, String
clientContextId, String statement,
Map<String, String> getOptionalParameters) throws
HyracksDataException;
+
+ /**
+ * Ensures a client's request can be executed before its job is started
+ *
+ * @param clientRequest
+ * @param metadataProvider
+ * @param jobSpec
+ * @throws HyracksDataException
+ */
+ void ensureRequirements(IClientRequest clientRequest, IMetadataProvider
metadataProvider, JobSpecification jobSpec)
+ throws HyracksDataException;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3181
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>