Steven Jacobs has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2249
Change subject: [ASTERIXDB-2214][FAIL] Prevent dropping of Functions Used By Active Entities ...................................................................... [ASTERIXDB-2214][FAIL] Prevent dropping of Functions Used By Active Entities Remove Specialized Feed checks for functional dependency Add Active Entity checks for functional dependency Change-Id: I62393b65eddc4c2520fc8a0f3f80960551f4a159 --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java 11 files changed, 69 insertions(+), 91 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/49/2249/1 diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index 37120e4..d4d1ef6 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.active; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.metadata.IDataset; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -52,6 +53,8 @@ */ boolean isEntityUsingDataset(IDataset dataset); + boolean dependsOnFunction(FunctionSignature function); + /** * subscribe to events. subscription ends when subscriber.done() returns true * diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 8cbc109..475ffa6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -41,6 +41,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.metadata.IDataset; @@ -77,6 +78,7 @@ protected final IHyracksClientConnection hcc; protected final EntityId entityId; private final List<Dataset> datasets; + private final List<FunctionSignature> functions; protected final ActiveEvent statsUpdatedEvent; protected final String runtimeName; protected final IRetryPolicyFactory retryPolicyFactory; @@ -100,7 +102,7 @@ protected Exception recoverFailure; public ActiveEntityEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx, - IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, + IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, List<FunctionSignature> functions, AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory) throws HyracksDataException { this.statementExecutor = statementExecutor; @@ -110,6 +112,7 @@ this.hcc = hcc; this.entityId = entityId; this.datasets = datasets; + this.functions = functions; this.retryPolicyFactory = retryPolicyFactory; this.state = ActivityState.STOPPED; this.statsTimestamp = -1; @@ -178,7 +181,6 @@ } } - @SuppressWarnings("unchecked") protected void finish(ActiveEvent event) throws HyracksDataException { LOGGER.log(level, "the job " + jobId + " finished"); if (numRegistered != numDeRegistered) { @@ -233,6 +235,11 @@ } @Override + public synchronized boolean dependsOnFunction(FunctionSignature function) { + return getFunctions().contains(function); + } + + @Override public synchronized void remove(Dataset dataset) throws HyracksDataException { if (isActive()) { throw new RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY, entityId, state); @@ -272,7 +279,6 @@ return strBuilder.toString(); } - @SuppressWarnings("unchecked") @Override public void refreshStats(long timeout) throws HyracksDataException { LOGGER.log(level, "refreshStats called"); @@ -540,6 +546,11 @@ } @Override + public List<FunctionSignature> getFunctions() { + return functions; + } + + @Override public synchronized void replace(Dataset dataset) { if (getDatasets().contains(dataset)) { getDatasets().remove(dataset); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index c0ce6ec..64bc12a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -25,17 +25,12 @@ import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventSubscriber; import org.apache.asterix.active.IRetryPolicyFactory; -import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; -import org.apache.asterix.app.translator.QueryTranslator; -import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.utils.JobUtils; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; -import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.lang.common.statement.StartFeedStatement; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -45,7 +40,6 @@ import org.apache.asterix.utils.FeedOperations; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobSpecification; @@ -56,10 +50,11 @@ private final List<FeedConnection> feedConnections; public FeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx, - IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, + IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, List<FunctionSignature> functions, AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory, Feed feed, final List<FeedConnection> feedConnections) throws HyracksDataException { - super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory); + super(statementExecutor, appCtx, hcc, entityId, datasets, functions, locations, runtimeName, + retryPolicyFactory); this.feed = feed; this.feedConnections = feedConnections; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index c69f5dc..13ab95f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -123,6 +123,7 @@ import org.apache.asterix.lang.common.statement.TypeDropStatement; import org.apache.asterix.lang.common.statement.WriteStatement; import org.apache.asterix.lang.common.struct.Identifier; +import org.apache.asterix.lang.common.util.MergePolicyUtils; import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataManager; @@ -157,7 +158,6 @@ import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.TypeSignature; -import org.apache.asterix.lang.common.util.MergePolicyUtils; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; @@ -1221,8 +1221,9 @@ List<Function> functionsInDataverse = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dataverseName); for (Function function : functionsInDataverse) { - if (checkWhetherFunctionIsBeingUsed(mdTxnCtx, function.getDataverseName(), function.getName(), - function.getArity(), dataverseName)) { + if (checkWhetherFunctionIsBeingUsed( + new FunctionSignature(function.getDataverseName(), function.getName(), function.getArity()), + false)) { throw new MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE, function.getDataverseName() + "." + function.getName() + "@" + function.getArity()); } @@ -1704,22 +1705,17 @@ } } - protected boolean checkWhetherFunctionIsBeingUsed(MetadataTransactionContext ctx, String dataverseName, - String functionName, int arity, String currentDataverse) throws AlgebricksException { - List<Dataverse> allDataverses = MetadataManager.INSTANCE.getDataverses(ctx); - for (Dataverse dataverse : allDataverses) { - if (currentDataverse != null && dataverse.getDataverseName().equals(currentDataverse)) { + protected boolean checkWhetherFunctionIsBeingUsed(FunctionSignature function, boolean checkLocalDataverse) + throws AlgebricksException { + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); + for (IActiveEntityEventsListener listener : listeners) { + if (!checkLocalDataverse && listener.getEntityId().getDataverse().equals(function.getNamespace())) { continue; } - List<Feed> feeds = MetadataManager.INSTANCE.getFeeds(ctx, dataverse.getDataverseName()); - for (Feed feed : feeds) { - List<FeedConnection> feedConnections = MetadataManager.INSTANCE.getFeedConections(ctx, - dataverse.getDataverseName(), feed.getFeedName()); - for (FeedConnection conn : feedConnections) { - if (conn.containsFunction(dataverseName, functionName, arity)) { - return true; - } - } + if (listener.dependsOnFunction(function)) { + return true; } } return false; @@ -1737,8 +1733,7 @@ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature); if (function == null && !stmtDropFunction.getIfExists()) { throw new AlgebricksException("Unknonw function " + signature); - } else if (checkWhetherFunctionIsBeingUsed(mdTxnCtx, signature.getNamespace(), signature.getName(), - signature.getArity(), null)) { + } else if (checkWhetherFunctionIsBeingUsed(signature, true)) { throw new MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE, signature); } else { MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature); @@ -2126,13 +2121,15 @@ if (listener == null) { // Prepare policy List<Dataset> datasets = new ArrayList<>(); + List<FunctionSignature> functions = new ArrayList<>(); for (FeedConnection connection : feedConnections) { Dataset ds = metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName()); datasets.add(ds); + functions.addAll(connection.getAppliedFunctions()); } listener = new FeedEventsListener(this, metadataProvider.getApplicationContext(), hcc, entityId, - datasets, null, FeedIntakeOperatorNodePushable.class.getSimpleName(), + datasets, functions, null, FeedIntakeOperatorNodePushable.class.getSimpleName(), NoRetryPolicyFactory.INSTANCE, feed, feedConnections); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java index 40d4f6a..17c21ae 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -135,7 +135,7 @@ nodeControllers[0] = new TestNodeControllerActor(nodes[0], clusterController); nodeControllers[1] = new TestNodeControllerActor(nodes[1], clusterController); listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, - new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, + new ArrayList<>(allDatasets), new ArrayList<>(), statementExecutor, appCtx, hcc, locations, new InfiniteRetryPolicyFactory()); users = new TestUserActor[3]; users[0] = newUser("Till", appCtx); @@ -346,7 +346,6 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testRecovery() throws Exception { testStartWhenStartSucceed(); @@ -459,12 +458,11 @@ Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testRecoveryFailureAfterOneAttemptCompilationFailure() throws Exception { handler.unregisterListener(listener); listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, - new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, + new ArrayList<>(allDatasets), new ArrayList<>(), statementExecutor, appCtx, hcc, locations, new CountRetryPolicyFactory(1)); testStartWhenStartSucceed(); WaitForStateSubscriber tempFailSubscriber = @@ -503,12 +501,11 @@ Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testRecoveryFailureAfterOneAttemptRuntimeFailure() throws Exception { handler.unregisterListener(listener); listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, - new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, + new ArrayList<>(allDatasets), new ArrayList<>(), statementExecutor, appCtx, hcc, locations, new CountRetryPolicyFactory(1)); testStartWhenStartSucceed(); WaitForStateSubscriber tempFailSubscriber = @@ -523,12 +520,12 @@ Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testRecoveryFailure() throws Exception { handler.unregisterListener(listener); listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, entityId, - new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc, locations, NoRetryPolicyFactory.INSTANCE); + new ArrayList<>(allDatasets), new ArrayList<>(), statementExecutor, appCtx, hcc, locations, + NoRetryPolicyFactory.INSTANCE); testStartWhenStartSucceed(); WaitForStateSubscriber tempFailSubscriber = new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED)); @@ -541,7 +538,6 @@ Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testStopDuringRecoveryAttemptThatSucceeds() throws Exception { testStartWhenStartSucceed(); @@ -569,7 +565,6 @@ Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testStopDuringRecoveryAttemptThatFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -597,7 +592,6 @@ Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testStopDuringRecoveryAttemptThatFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -625,7 +619,6 @@ Assert.assertEquals(ActivityState.STOPPED, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testStartDuringRecoveryAttemptThatSucceeds() throws Exception { testStartWhenStartSucceed(); @@ -651,7 +644,6 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testStartDuringRecoveryAttemptThatFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -676,7 +668,6 @@ assertFailure(action, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED); } - @SuppressWarnings("deprecation") @Test public void testStartDuringRecoveryAttemptThatFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -700,7 +691,6 @@ assertFailure(action, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED); } - @SuppressWarnings("deprecation") @Test public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeSucceeds() throws Exception { testStartWhenStartSucceed(); @@ -731,7 +721,6 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -774,7 +763,6 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -817,7 +805,6 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testSuspendDuringRecoveryAttemptThatFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -859,7 +846,6 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testSuspendDuringRecoveryAttemptThatFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -901,7 +887,6 @@ Assert.assertEquals(ActivityState.RUNNING, listener.getState()); } - @SuppressWarnings("deprecation") @Test public void testCreateNewShadowDuringRecoveryAttemptThatSucceeds() throws Exception { testStartWhenStartSucceed(); @@ -929,7 +914,6 @@ Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } - @SuppressWarnings("deprecation") @Test public void testCreateNewShadowDuringRecoveryAttemptThatFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -956,7 +940,6 @@ Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } - @SuppressWarnings("deprecation") @Test public void testCreateNewShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -1056,7 +1039,6 @@ Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } - @SuppressWarnings("deprecation") @Test public void testDeleteShadowDuringRecoveryAttemptThatSucceeds() throws Exception { testStartWhenStartSucceed(); @@ -1082,7 +1064,6 @@ Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } - @SuppressWarnings("deprecation") @Test public void testDeleteShadowDuringRecoveryAttemptThatFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -1107,7 +1088,6 @@ Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } - @SuppressWarnings("deprecation") @Test public void testDeleteShadowDuringRecoveryAttemptThatFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -1197,7 +1177,6 @@ Assert.assertEquals(clusterController.getAllDatasets().size(), listener.getDatasets().size()); } - @SuppressWarnings("deprecation") @Test public void testCreateNewIndexDuringRecoveryAttemptThatSucceeds() throws Exception { testStartWhenStartSucceed(); @@ -1221,7 +1200,6 @@ assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); } - @SuppressWarnings("deprecation") @Test public void testCreateNewIndexDuringRecoveryAttemptThatFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -1244,7 +1222,6 @@ assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); } - @SuppressWarnings("deprecation") @Test public void testCreateNewIndexDuringRecoveryAttemptThatFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -1321,7 +1298,6 @@ assertFailure(add, ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY); } - @SuppressWarnings("deprecation") @Test public void testDeleteIndexDuringRecoveryAttemptThatSucceeds() throws Exception { testStartWhenStartSucceed(); @@ -1345,7 +1321,6 @@ assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); } - @SuppressWarnings("deprecation") @Test public void testDeleteIndexDuringRecoveryAttemptThatFailsCompile() throws Exception { testStartWhenStartSucceed(); @@ -1368,7 +1343,6 @@ assertFailure(drop, ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY); } - @SuppressWarnings("deprecation") @Test public void testDeleteIndexDuringRecoveryAttemptThatFailsRuntime() throws Exception { testStartWhenStartSucceed(); @@ -1470,8 +1444,8 @@ Mockito.when(ccAppCtx.getStorageComponentProvider()).thenReturn(componentProvider); AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(nodes); additionalListeners[i] = listener = new TestEventsListener(clusterController, nodeControllers, jobIdFactory, - entityId, new ArrayList<>(allDatasets), statementExecutor, ccAppCtx, hcc, locations, - new InfiniteRetryPolicyFactory()); + entityId, new ArrayList<>(allDatasets), new ArrayList<>(), statementExecutor, ccAppCtx, hcc, + locations, new InfiniteRetryPolicyFactory()); } Action suspension = users[0].suspendAllActivities(handler); suspension.sync(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index df7756b..5f9a2f7 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -37,6 +37,7 @@ import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -74,6 +75,7 @@ ActiveRuntimeId activeRuntimeId = new ActiveRuntimeId(entityId, FeedIntakeOperatorNodePushable.class.getSimpleName(), 0); List<Dataset> datasetList = new ArrayList<>(); + List<FunctionSignature> functionList = new ArrayList<>(); AlgebricksAbsolutePartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint( new String[] { "asterix_nc1" }); String requestedStats; @@ -101,7 +103,8 @@ MetadataProvider mdProvider = new MetadataProvider(appCtx, null); // Add event listener ActiveEntityEventsListener eventsListener = new DummyFeedEventsListener(statementExecutor, appCtx, null, - entityId, datasetList, partitionConstraint, FeedIntakeOperatorNodePushable.class.getSimpleName(), + entityId, datasetList, functionList, partitionConstraint, + FeedIntakeOperatorNodePushable.class.getSimpleName(), NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList()); // Register mock runtime NCAppRuntimeContext nc1AppCtx = (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0] diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java index c269803..bf86658 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java @@ -28,6 +28,7 @@ import org.apache.asterix.active.IRetryPolicyFactory; import org.apache.asterix.app.active.FeedEventsListener; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -35,18 +36,17 @@ import org.apache.asterix.metadata.entities.FeedConnection; import org.apache.asterix.translator.IStatementExecutor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; public class DummyFeedEventsListener extends FeedEventsListener { public DummyFeedEventsListener(IStatementExecutor statementExecutor, ICcApplicationContext appCtx, - IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, + IHyracksClientConnection hcc, EntityId entityId, List<Dataset> datasets, List<FunctionSignature> functions, AlgebricksAbsolutePartitionConstraint locations, String runtimeName, IRetryPolicyFactory retryPolicyFactory, Feed feed, List<FeedConnection> feedConnections) throws HyracksDataException { - super(statementExecutor, appCtx, hcc, entityId, datasets, locations, runtimeName, retryPolicyFactory, feed, - feedConnections); + super(statementExecutor, appCtx, hcc, entityId, datasets, functions, locations, runtimeName, retryPolicyFactory, + feed, feedConnections); } @Override diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java index 1cedc96..fbbde6b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java @@ -30,6 +30,7 @@ import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.common.metadata.LockList; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -63,11 +64,12 @@ private Behavior onStop = Behavior.FAIL_COMPILE; public TestEventsListener(TestClusterControllerActor clusterController, TestNodeControllerActor[] nodeControllers, - JobIdFactory jobIdFactory, EntityId entityId, List<Dataset> datasets, IStatementExecutor statementExecutor, - ICcApplicationContext appCtx, IHyracksClientConnection hcc, AlgebricksAbsolutePartitionConstraint locations, - IRetryPolicyFactory retryPolicyFactory) throws HyracksDataException { - super(statementExecutor, appCtx, hcc, entityId, datasets, locations, TestEventsListener.class.getSimpleName(), - retryPolicyFactory); + JobIdFactory jobIdFactory, EntityId entityId, List<Dataset> datasets, List<FunctionSignature> functions, + IStatementExecutor statementExecutor, ICcApplicationContext appCtx, IHyracksClientConnection hcc, + AlgebricksAbsolutePartitionConstraint locations, IRetryPolicyFactory retryPolicyFactory) + throws HyracksDataException { + super(statementExecutor, appCtx, hcc, entityId, datasets, functions, locations, + TestEventsListener.class.getSimpleName(), retryPolicyFactory); this.clusterController = clusterController; this.nodeControllers = nodeControllers; this.jobIdFactory = jobIdFactory; @@ -84,7 +86,6 @@ } } - @SuppressWarnings("deprecation") private void failCompile(Behavior behavior) throws HyracksDataException { if (behavior == Behavior.FAIL_COMPILE || behavior == Behavior.STEP_FAIL_COMPILE) { throw new HyracksDataException("Compilation Failure"); @@ -103,7 +104,6 @@ } } - @SuppressWarnings("deprecation") @Override protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException { step(onStart); @@ -142,7 +142,6 @@ } } - @SuppressWarnings("deprecation") @Override protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException { step(onStop); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java index d4b4215..6331c9f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java @@ -57,7 +57,7 @@ @Override public int hashCode() { - return (namespace + "." + name).hashCode(); + return (namespace + "." + name + "@" + arity).hashCode(); } public String getNamespace() { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java index c73a433..f9333f3 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -98,6 +99,11 @@ List<Dataset> getDatasets(); /** + * @return the list of associated functions + */ + List<FunctionSignature> getFunctions(); + + /** * replace the dataset object with the passed updated object * * @param target diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java index 8e14ee5..6818cc5 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java @@ -19,13 +19,13 @@ package org.apache.asterix.metadata.entities; +import java.util.List; + import org.apache.asterix.active.EntityId; import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.metadata.MetadataCache; import org.apache.asterix.metadata.api.IMetadataEntity; - -import java.util.List; /** * Feed connection records the feed --> dataset mapping. @@ -111,15 +111,5 @@ public EntityId getFeedId() { return feedId; - } - - public boolean containsFunction(String dataverseName, String functionName, int arity) { - for (FunctionSignature signature : this.appliedFunctions) { - if (signature.getNamespace().equals(dataverseName) && signature.getName().equals(functionName) - && signature.getArity() == arity) { - return true; - } - } - return false; } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2249 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I62393b65eddc4c2520fc8a0f3f80960551f4a159 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>