Michael Blow has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2361
Change subject: Coordinated change for https://asterix-gerrit.ics.uci.edu/#/c/2344/ ...................................................................... Coordinated change for https://asterix-gerrit.ics.uci.edu/#/c/2344/ Change-Id: I973c67448d4b34c4521d0abd23c999397e88cf67 --- M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java 2 files changed, 14 insertions(+), 14 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/61/2361/1 diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index feaa3ca..53baf6d 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -40,6 +40,7 @@ import org.apache.asterix.bad.metadata.Channel; import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener; import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType; +import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -256,11 +257,11 @@ } private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc, - DeployedJobSpecEventListener listener) throws Exception { + DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) throws Exception { if (channeljobSpec != null) { DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec); ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc, - ChannelJobService.findPeriod(duration), new HashMap<>(), entityId); + ChannelJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory); listener.storeDistributedInfo(destributedId, ses, null, null); } @@ -331,7 +332,7 @@ activeEventHandler.registerListener(listener); } - setupExecutorJob(entityId, channeljobSpec, hcc, listener); + setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory()); channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function, duration, null); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java index 7ab7f95..7db935a 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java @@ -18,6 +18,12 @@ */ package org.apache.asterix.bad.lang.statement; +import java.io.DataOutput; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.asterix.active.DeployedJobService; import org.apache.asterix.active.EntityId; import org.apache.asterix.algebra.extension.IExtensionStatement; @@ -45,7 +51,6 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.om.base.AString; import org.apache.asterix.om.base.IAObject; -import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; import org.apache.asterix.translator.ConstantHelper; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; @@ -56,12 +61,6 @@ import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; - -import java.io.DataOutput; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; public class ExecuteProcedureStatement implements IExtensionStatement { @@ -133,7 +132,7 @@ if (procedure.getDuration().equals("")) { //Add the Asterix Transaction Id to the map - long newTxId = TxnIdFactory.create().getId(); + long newTxId = metadataProvider.getTxnIdFactory().create().getId(); contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME, String.valueOf(newTxId).getBytes()); jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap); @@ -153,9 +152,9 @@ } } else { - ScheduledExecutorService ses = - DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc, - ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId); + ScheduledExecutorService ses = DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc, + ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId, + metadataProvider.getTxnIdFactory()); listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(), listener.getResultId()); } -- To view, visit https://asterix-gerrit.ics.uci.edu/2361 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I973c67448d4b34c4521d0abd23c999397e88cf67 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb-bad Gerrit-Branch: master Gerrit-Owner: Michael Blow <mb...@apache.org>