Steven Jacobs has submitted this change and it was merged. Change subject: Coordinated change to support parameterized queries ......................................................................
Coordinated change to support parameterized queries Change-Id: Icce06a1548a4f4150545c1fda7e5be3608472af5 --- M asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java 8 files changed, 23 insertions(+), 44 deletions(-) Approvals: Steven Jacobs: Looks good to me, approved Till Westmann: Looks good to me, but someone else must approve Jenkins: Verified diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java index 69145d9..c48ec54 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java @@ -108,9 +108,8 @@ public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId distributedId, IHyracksClientConnection hcc, Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) throws Exception { - long executionMilliseconds = - runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, null, listener, - null); + long executionMilliseconds = runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, + null, listener, null); if (executionMilliseconds > period) { LOGGER.log(Level.SEVERE, "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "." @@ -153,7 +152,6 @@ } - public static long findPeriod(String duration) { //TODO: Allow Repetitive Channels to use YMD durations String hoursMinutesSeconds = ""; @@ -189,7 +187,8 @@ metadataProvider.setMetadataTxnContext(mdTxnCtx); JobSpecification jobSpec = null; try { - jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null); + jobSpec = ((QueryTranslator) statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null, null, + null); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; } catch (Exception e) { @@ -230,7 +229,7 @@ jobSpec = compilePushChannel(badStatementExecutor, metadataProvider, hcc, (Query) fStatements.get(1)); } else { jobSpec = badStatementExecutor.handleInsertUpsertStatement(metadataProvider, fStatements.get(1), hcc, - null, null, null, null, true, null); + null, null, null, null, true, null, null, null); } } else { //Procedures @@ -263,7 +262,7 @@ metadataProvider.setMetadataTxnContext(mdTxnCtx); JobSpecification jobSpec; try { - jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null); + jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null, null, null); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx); @@ -277,14 +276,15 @@ IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception { if (procedureStatement.getKind() == Statement.Kind.INSERT) { return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, - procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null); + procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null, + null, null); } else if (procedureStatement.getKind() == Statement.Kind.QUERY) { return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement); } else { SqlppDeleteRewriteVisitor visitor = new SqlppDeleteRewriteVisitor(); procedureStatement.accept(visitor, null); return ((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement, - hcc, true); + hcc, true, null, null); } } diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java index 2a3d4fb..99f0d66 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADCompilationProvider.java @@ -18,36 +18,15 @@ */ package org.apache.asterix.bad.lang; -import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.compiler.provider.IRuleSetFactory; -import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory; +import org.apache.asterix.compiler.provider.SqlppCompilationProvider; import org.apache.asterix.lang.common.base.IParserFactory; -import org.apache.asterix.lang.common.base.IRewriterFactory; -import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory; -import org.apache.asterix.lang.sqlpp.visitor.SqlppAstPrintVisitorFactory; -import org.apache.asterix.translator.SqlppExpressionToPlanTranslatorFactory; -public class BADCompilationProvider implements ILangCompilationProvider { +public class BADCompilationProvider extends SqlppCompilationProvider { @Override public IParserFactory getParserFactory() { return new BADParserFactory(); - } - - @Override - public IRewriterFactory getRewriterFactory() { - return new SqlppRewriterFactory(); - } - - @Override - public IAstPrintVisitorFactory getAstPrintVisitorFactory() { - return new SqlppAstPrintVisitorFactory(); - } - - @Override - public ILangExpressionToPlanTranslatorFactory getExpressionToPlanTranslatorFactory() { - return new SqlppExpressionToPlanTranslatorFactory(); } @Override diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java index 2f23a9c..3ea2c0d 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java @@ -334,7 +334,7 @@ } } } - final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null); + final IRequestParameters requestParameters = new RequestParameters(null, null, null, null, null, null, null); for (Channel channel : channels) { if (!channel.getChannelId().getDataverse().equals(dvId.getValue())) { continue; diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java index ca1241c..7583f0b 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java @@ -207,13 +207,13 @@ InsertStatement insert = new InsertStatement(new Identifier(dataverse), new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor); ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc, - resultDelivery, null, stats, false, null); + resultDelivery, null, stats, false, null, null, null); } else { //To update an existing subscription UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse), new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null); ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc, - resultDelivery, null, stats, false, null); + resultDelivery, null, stats, false, null, null, null); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java index 1b18f83..b23bf3b 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java @@ -139,7 +139,8 @@ MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), metadataProvider.getDefaultDataverse()); tempMdProvider.getConfig().putAll(metadataProvider.getConfig()); - ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false); + ((QueryTranslator) statementExecutor).handleDeleteStatement(tempMdProvider, delete, hcc, false, null, + null); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { QueryTranslator.abort(e, e, mdTxnCtx); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 204e8aa..a28666a 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -253,7 +253,7 @@ (Query) fStatements.get(1)); } return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1), - hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null); + hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null); } @Override diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index be5bedb..aaf2bfa 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -176,7 +176,7 @@ private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats) - throws Exception { + throws Exception { if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) { if (!varList.isEmpty()) { throw new CompilationException("Insert procedures cannot have parameters"); @@ -185,9 +185,8 @@ dependencies.get(0).add(Arrays.asList( ((QueryTranslator) statementExecutor).getActiveDataverse(insertStatement.getDataverseName()), insertStatement.getDatasetName().getValue())); - return new Pair<>( - ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, - getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null), + return new Pair<>(((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, + getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null, null, null), PrecompiledType.INSERT); } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) { SqlppRewriterFactory fact = new SqlppRewriterFactory(); @@ -209,7 +208,7 @@ metadataProvider); Pair<JobSpecification, PrecompiledType> pair = new Pair<>(((QueryTranslator) statementExecutor).handleDeleteStatement(metadataProvider, - getProcedureBodyStatement(), hcc, true), PrecompiledType.DELETE); + getProcedureBodyStatement(), hcc, true, null, null), PrecompiledType.DELETE); return pair; } else { throw new CompilationException("Procedure can only execute a single delete, insert, or query"); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java index d34d170..89d940e 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java @@ -122,7 +122,7 @@ listener.suspend(); activeEventHandler.registerListener(listener); BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor, - hcc, new RequestParameters(null, null, null, null, null, null), true); + hcc, new RequestParameters(null, null, null, null, null, null, null), true); ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(listener.getDeployedJobSpecId(), hcc, @@ -149,7 +149,7 @@ new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS), new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), - new IStatementExecutor.Stats(), null, null, null), + new IStatementExecutor.Stats(), null, null, null, null), true); metadataProvider.getLocks().unlock(); //Log that the procedure stopped by cluster restart. Procedure is available again now. -- To view, visit https://asterix-gerrit.ics.uci.edu/2712 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Icce06a1548a4f4150545c1fda7e5be3608472af5 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb-bad Gerrit-Branch: master Gerrit-Owner: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Steven Jacobs <sjaco...@ucr.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com>