Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3181

Change subject: [NO ISSUE][OTH] Add API To Ensure Request Requirements
......................................................................

[NO ISSUE][OTH] Add API To Ensure Request Requirements

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Add API to ensure a client's request requirement is met
  before running its job.

Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
3 files changed, 30 insertions(+), 8 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/81/3181/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index c174c02..f3afaaa 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -26,7 +26,8 @@
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.api.RequestReference;
 import org.apache.http.HttpHeaders;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.util.NetworkUtil;
 
@@ -49,7 +50,13 @@
 
     @Override
     public IClientRequest requestReceived(IRequestReference requestRef, String 
clientContextId, String statement,
-            Map<String, String> optionalParameters) throws 
HyracksDataException {
+            Map<String, String> optionalParameters) {
         return new ClientRequest(requestRef, clientContextId, statement, 
optionalParameters);
     }
+
+    @Override
+    public void ensureRequirements(IClientRequest clientRequest, 
IMetadataProvider metadataProvider,
+            JobSpecification jobSpec) {
+        // currently we don't have any restrictions
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index df285c6..e2d4005 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2498,7 +2498,7 @@
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
                 executorService.submit(() -> asyncCreateAndRunJob(hcc, 
compiler, locker, resultDelivery,
-                        requestParameters, cancellable, resultSetId, printed));
+                        requestParameters, cancellable, resultSetId, printed, 
metadataProvider));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -2513,7 +2513,7 @@
                     sessionOutput.release();
                     ResultUtil.printResults(appCtx, resultReader, 
sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
-                }, requestParameters, cancellable, appCtx);
+                }, requestParameters, cancellable, appCtx, metadataProvider);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, 
resultDelivery, id -> {
@@ -2523,7 +2523,7 @@
                         outMetadata.getResultSets()
                                 .add(Triple.of(id, resultSetId, 
metadataProvider.findOutputRecordType()));
                     }
-                }, requestParameters, cancellable, appCtx);
+                }, requestParameters, cancellable, appCtx, metadataProvider);
                 break;
             default:
                 break;
@@ -2551,7 +2551,7 @@
 
     private void asyncCreateAndRunJob(IHyracksClientConnection hcc, 
IStatementCompiler compiler, IMetadataLocker locker,
             ResultDelivery resultDelivery, IRequestParameters 
requestParameters, boolean cancellable,
-            ResultSetId resultSetId, MutableBoolean printed) {
+            ResultSetId resultSetId, MutableBoolean printed, MetadataProvider 
metadataProvider) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
             createAndRunJob(hcc, jobFlags, jobId, compiler, locker, 
resultDelivery, id -> {
@@ -2562,7 +2562,7 @@
                     printed.setTrue();
                     printed.notify();
                 }
-            }, requestParameters, cancellable, appCtx);
+            }, requestParameters, cancellable, appCtx, metadataProvider);
         } catch (Exception e) {
             if (Objects.equals(JobId.INVALID, jobId.getValue())) {
                 // compilation failed
@@ -2593,7 +2593,8 @@
 
     private static void createAndRunJob(IHyracksClientConnection hcc, 
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, 
ResultDelivery resultDelivery, IResultPrinter printer,
-            IRequestParameters requestParameters, boolean cancellable, 
ICcApplicationContext appCtx) throws Exception {
+            IRequestParameters requestParameters, boolean cancellable, 
ICcApplicationContext appCtx,
+            MetadataProvider metadataProvider) throws Exception {
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -2603,6 +2604,7 @@
             if (jobSpec == null) {
                 return;
             }
+            appCtx.getReceptionist().ensureRequirements(clientRequest, 
metadataProvider, jobSpec);
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (cancellable) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 51df306..2b776fc 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -20,7 +20,9 @@
 
 import java.util.Map;
 
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.http.api.IServletRequest;
 
 public interface IReceptionist {
@@ -45,4 +47,15 @@
      */
     IClientRequest requestReceived(IRequestReference requestRef, String 
clientContextId, String statement,
             Map<String, String> getOptionalParameters) throws 
HyracksDataException;
+
+    /**
+     * Ensures a client's request can be executed before its job is started
+     *
+     * @param clientRequest
+     * @param metadataProvider
+     * @param jobSpec
+     * @throws HyracksDataException
+     */
+    void ensureRequirements(IClientRequest clientRequest, IMetadataProvider 
metadataProvider, JobSpecification jobSpec)
+            throws HyracksDataException;
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3181
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to