Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][OTH] Add API To Ensure Request Requirements ......................................................................
[NO ISSUE][OTH] Add API To Ensure Request Requirements - user model changes: no - storage format changes: no - interface changes: yes Details: - Refactor IRequestParameters as ICommonRequestParameters and IRequestParameters to break cyclic dependencies. - Add new API to ensure request can be scheduled for execution. Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc Reviewed-on: https://asterix-gerrit.ics.uci.edu/3181 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java A asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java 9 files changed, 218 insertions(+), 63 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved; Verified; Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java index 014bf3c..fe6aeea 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -18,9 +18,7 @@ */ package org.apache.asterix.translator; -import java.util.Map; - -import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.api.ICommonRequestParameters; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -35,11 +33,10 @@ protected Thread executor; protected String clientContextId; - public ClientRequest(IRequestReference requestReference, String clientContextId, String statement, - Map<String, String> optionalParameters) { - super(requestReference); - this.clientContextId = clientContextId; - this.statement = statement; + public ClientRequest(ICommonRequestParameters requestParameters) { + super(requestParameters.getRequestReference()); + this.clientContextId = requestParameters.getClientContextId(); + this.statement = requestParameters.getStatement(); this.executor = Thread.currentThread(); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java index 86ba301..6e41cd2 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java @@ -20,12 +20,13 @@ import java.util.Map; +import org.apache.asterix.common.api.ICommonRequestParameters; import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.om.base.IAObject; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.hyracks.api.result.IResultSet; -public interface IRequestParameters { +public interface IRequestParameters extends ICommonRequestParameters { /** * @return A Resultset client object that is used to read the results. @@ -50,36 +51,7 @@ IStatementExecutor.ResultMetadata getOutMetadata(); /** - * @return the client context id for the query - */ - String getClientContextId(); - - /** - * @return Optional request parameters. Otherwise null. - */ - Map<String, String> getOptionalParameters(); - - /** * @return Statement parameters */ Map<String, IAObject> getStatementParameters(); - - /** - * @return true if the request accepts multiple statements. Otherwise, false. - */ - boolean isMultiStatement(); - - /** - * Gets the statement the client provided with the request - * - * @return the request statement - */ - String getStatement(); - - /** - * The request reference of this {@link IRequestParameters} - * - * @return the request reference - */ - IRequestReference getRequestReference(); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java index c174c02..52aab20 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java @@ -18,15 +18,15 @@ */ package org.apache.asterix.translator; -import java.util.Map; import java.util.UUID; import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.api.ICommonRequestParameters; import org.apache.asterix.common.api.IReceptionist; import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.api.ISchedulableClientRequest; import org.apache.asterix.common.api.RequestReference; import org.apache.http.HttpHeaders; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.util.NetworkUtil; @@ -48,8 +48,12 @@ } @Override - public IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement, - Map<String, String> optionalParameters) throws HyracksDataException { - return new ClientRequest(requestRef, clientContextId, statement, optionalParameters); + public IClientRequest requestReceived(ICommonRequestParameters requestParameters) { + return new ClientRequest(requestParameters); + } + + @Override + public void ensureSchedulable(ISchedulableClientRequest schedulableRequest) { + // currently we don't have any restrictions } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java new file mode 100644 index 0000000..ca04463 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SchedulableClientRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.translator; + +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.api.ICommonRequestParameters; +import org.apache.asterix.common.api.ISchedulableClientRequest; +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import org.apache.hyracks.api.job.JobSpecification; + +public class SchedulableClientRequest implements ISchedulableClientRequest { + + private final IClientRequest clientRequest; + private final JobSpecification jobSpec; + private final IMetadataProvider metadataProvider; + private final ICommonRequestParameters requestParameters; + + private SchedulableClientRequest(IClientRequest clientRequest, ICommonRequestParameters requestParameters, + IMetadataProvider metadataProvider, JobSpecification jobSpec) { + this.clientRequest = clientRequest; + this.requestParameters = requestParameters; + this.metadataProvider = metadataProvider; + this.jobSpec = jobSpec; + } + + public static SchedulableClientRequest of(IClientRequest clientRequest, ICommonRequestParameters requestParameters, + IMetadataProvider metadataProvider, JobSpecification jobSpec) { + return new SchedulableClientRequest(clientRequest, requestParameters, metadataProvider, jobSpec); + } + + @Override + public IClientRequest getClientRequest() { + return clientRequest; + } + + @Override + public ICommonRequestParameters getRequestParameters() { + return requestParameters; + } + + @Override + public JobSpecification getJobSpecification() { + return jobSpec; + } + + @Override + public IMetadataProvider getMetadataProvider() { + return metadataProvider; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index f89b2db..c49fcdd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -163,6 +163,7 @@ import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.SchedulableClientRequest; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; import org.apache.asterix.translator.TypeTranslator; @@ -2499,7 +2500,7 @@ case ASYNC: MutableBoolean printed = new MutableBoolean(false); executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery, - requestParameters, cancellable, resultSetId, printed)); + requestParameters, cancellable, resultSetId, printed, metadataProvider)); synchronized (printed) { while (!printed.booleanValue()) { printed.wait(); @@ -2514,7 +2515,7 @@ sessionOutput.release(); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); - }, requestParameters, cancellable, appCtx); + }, requestParameters, cancellable, appCtx, metadataProvider); break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { @@ -2524,7 +2525,7 @@ outMetadata.getResultSets() .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType())); } - }, requestParameters, cancellable, appCtx); + }, requestParameters, cancellable, appCtx, metadataProvider); break; default: break; @@ -2552,7 +2553,7 @@ private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable, - ResultSetId resultSetId, MutableBoolean printed) { + ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider) { Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID); try { createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> { @@ -2563,7 +2564,7 @@ printed.setTrue(); printed.notify(); } - }, requestParameters, cancellable, appCtx); + }, requestParameters, cancellable, appCtx, metadataProvider); } catch (Exception e) { if (Objects.equals(JobId.INVALID, jobId.getValue())) { // compilation failed @@ -2594,7 +2595,8 @@ private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, - IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx) throws Exception { + IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx, + MetadataProvider metadataProvider) throws Exception { final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); @@ -2607,6 +2609,9 @@ if (cancellable) { clientRequest.markCancellable(); } + final SchedulableClientRequest schedulableRequest = + SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec); + appCtx.getReceptionist().ensureSchedulable(schedulableRequest); final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); clientRequest.setJobId(jobId); if (jId != null) { @@ -2946,9 +2951,7 @@ } protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException { - final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived( - requestParameters.getRequestReference(), requestParameters.getClientContextId(), - requestParameters.getStatement(), requestParameters.getOptionalParameters()); + final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters); appCtx.getRequestTracker().track(clientRequest); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index 1a526b2..2eef241 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -30,6 +30,7 @@ import org.apache.asterix.api.http.server.CcQueryCancellationServlet; import org.apache.asterix.api.http.server.ServletConstants; +import org.apache.asterix.app.translator.RequestParameters; import org.apache.asterix.common.api.RequestReference; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.runtime.utils.RequestTracker; @@ -66,7 +67,9 @@ verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND); final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis()); - ClientRequest request = new ClientRequest(requestReference, "1", "select 1;", new HashMap<>()); + RequestParameters requestParameters = + new RequestParameters(requestReference, "select 1", null, null, null, null, "1", null, null, true); + ClientRequest request = new ClientRequest(requestParameters); request.setJobId(new JobId(1)); request.markCancellable(); tracker.track(request); @@ -81,7 +84,9 @@ // Tests the case that the job cancellation hit some exception from Hyracks. final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis()); - ClientRequest request2 = new ClientRequest(requestReference2, "2", "select 1;", new HashMap<>()); + requestParameters = + new RequestParameters(requestReference2, "select 1", null, null, null, null, "2", null, null, true); + ClientRequest request2 = new ClientRequest(requestParameters); request2.setJobId(new JobId(2)); request2.markCancellable(); tracker.track(request2); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java new file mode 100644 index 0000000..8daff1d --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ICommonRequestParameters.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import java.util.Map; + +public interface ICommonRequestParameters { + + /** + * The request reference of this {@link ICommonRequestParameters} + * + * @return the request reference + */ + IRequestReference getRequestReference(); + + /** + * @return the client context id for the request + */ + String getClientContextId(); + + /** + * @return Optional request parameters. Otherwise null. + */ + Map<String, String> getOptionalParameters(); + + /** + * @return true if the request accepts multiple statements. Otherwise, false. + */ + boolean isMultiStatement(); + + /** + * Gets the statement the client provided with the request + * + * @return the request statement + */ + String getStatement(); +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java index 51df306..95ed22e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.common.api; -import java.util.Map; - import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; @@ -36,13 +34,17 @@ /** * Generates a {@link IClientRequest} based on the requests parameters * - * @param requestRef - * @param clientContextId - * @param statement - * @param getOptionalParameters - * @return A client request + * @param requestParameters + * @return the client request * @throws HyracksDataException */ - IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement, - Map<String, String> getOptionalParameters) throws HyracksDataException; + IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException; + + /** + * Ensures a client's request can be executed before its job is started + * + * @param schedulableRequest + * @throws HyracksDataException + */ + void ensureSchedulable(ISchedulableClientRequest schedulableRequest) throws HyracksDataException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java new file mode 100644 index 0000000..7723550 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ISchedulableClientRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import org.apache.hyracks.api.job.JobSpecification; + +public interface ISchedulableClientRequest { + + /** + * Gets the client request + * + * @return the client request + */ + IClientRequest getClientRequest(); + + /** + * Gets the request common parameters + * + * @return the request common parameters + */ + ICommonRequestParameters getRequestParameters(); + + /** + * Gets the request's job specification + * + * @return + */ + JobSpecification getJobSpecification(); + + /** + * Gets the metadata provider used to execute this request + * + * @return the metadata provider + */ + IMetadataProvider getMetadataProvider(); +} \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/3181 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ifb0513e0baf2b473006d4aa23040c86751fbb4fc Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
