Steven Jacobs has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1524
Change subject: Enable Feed Changes to work with BAD project ...................................................................... Enable Feed Changes to work with BAD project Extracts the ActiveListener Enables listeners to survive after job executions Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77 --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java 4 files changed, 78 insertions(+), 27 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/24/1524/1 diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java index e4491bd..eba0fee 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java @@ -57,11 +57,8 @@ LOGGER.log(Level.FINER, "Next event is of type " + event.getEventKind()); LOGGER.log(Level.FINER, "Notifying the listener"); listener.notify(event); - if (event.getEventKind() == Kind.JOB_FINISHED) { - LOGGER.log(Level.FINER, "Removing the job"); - jobId2ActiveJobInfos.remove(event.getJobId()); - LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); - entityEventListeners.remove(listener.getEntityId()); + if (event.getEventKind() == Kind.JOB_FINISHED && !listener.isRepeatable()) { + removeJob(event.getJobId(), listener); } } else { LOGGER.log(Level.SEVERE, "Entity not found for received message for job " + event.getJobId()); @@ -75,6 +72,13 @@ LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName()); } + public void removeJob(JobId hyracksJobId, IActiveEntityEventsListener listener) { + LOGGER.log(Level.FINER, "Removing the job"); + jobId2ActiveJobInfos.remove(hyracksJobId); + LOGGER.log(Level.FINER, "Removing the listener since it is not active anymore"); + entityEventListeners.remove(listener.getEntityId()); + } + public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) { if (DEBUG) { LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index ee8e776..7f3554a 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -41,6 +41,12 @@ ActivityState getState(); /** + * @return whether the listener is for a repeatable job + * Repeatable jobs won't delete listeners after job is finished + */ + boolean isRepeatable(); + + /** * get a subscriber that waits till state has been reached. * * @param state diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java new file mode 100644 index 0000000..9472ea7 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.management; + +import java.util.List; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.common.metadata.IDataset; +import org.apache.hyracks.api.job.JobId; + +public abstract class ActiveEntityEventsListener implements IActiveEntityEventsListener { + + // members + protected EntityId entityId; + protected List<IDataset> datasets; + protected volatile ActivityState state; + protected JobId jobId; + protected boolean repeatableJob; + + @Override + public EntityId getEntityId() { + return entityId; + } + + @Override + public ActivityState getState() { + return state; + } + + @Override + public boolean isEntityUsingDataset(IDataset dataset) { + return datasets.contains(dataset); + } + + public JobId getJobId() { + return jobId; + } + + @Override + public boolean isRepeatable() { + return repeatableJob; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java index 2a87cab..820812a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java @@ -27,7 +27,6 @@ import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.active.IActiveEventSubscriber; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.common.metadata.IDataset; @@ -36,20 +35,15 @@ import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; -public class FeedEventsListener implements IActiveEntityEventsListener { +public class FeedEventsListener extends ActiveEntityEventsListener { // constants private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class.getName()); // members - private final EntityId entityId; - private final List<IDataset> datasets; private final String[] sources; private final List<IActiveEventSubscriber> subscribers; - private volatile ActivityState state; private int numRegistered; - private JobId jobId; public FeedEventsListener(EntityId entityId, List<IDataset> datasets, String[] sources) { this.entityId = entityId; @@ -57,6 +51,7 @@ this.sources = sources; subscribers = new ArrayList<>(); state = ActivityState.STOPPED; + repeatableJob = false; } @Override @@ -119,16 +114,6 @@ } @Override - public EntityId getEntityId() { - return entityId; - } - - @Override - public ActivityState getState() { - return state; - } - - @Override public IActiveEventSubscriber subscribe(ActivityState state) throws HyracksDataException { if (state != ActivityState.STARTED && state != ActivityState.STOPPED) { throw new HyracksDataException("Can only wait for STARTED or STOPPED state"); @@ -148,11 +133,6 @@ FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state); subscribers.add(subscriber); return subscriber; - } - - @Override - public boolean isEntityUsingDataset(IDataset dataset) { - return datasets.contains(dataset); } public String[] getSources() { -- To view, visit https://asterix-gerrit.ics.uci.edu/1524 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>