Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1524
Change subject: Enable Feed Changes to work with BAD project
......................................................................
Enable Feed Changes to work with BAD project
Extracts the ActiveListener
Enables listeners to survive after job executions
Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
4 files changed, 78 insertions(+), 27 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/24/1524/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index e4491bd..eba0fee 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -57,11 +57,8 @@
LOGGER.log(Level.FINER, "Next event is of type " +
event.getEventKind());
LOGGER.log(Level.FINER, "Notifying the listener");
listener.notify(event);
- if (event.getEventKind() == Kind.JOB_FINISHED) {
- LOGGER.log(Level.FINER, "Removing the job");
- jobId2ActiveJobInfos.remove(event.getJobId());
- LOGGER.log(Level.FINER, "Removing the listener since
it is not active anymore");
- entityEventListeners.remove(listener.getEntityId());
+ if (event.getEventKind() == Kind.JOB_FINISHED &&
!listener.isRepeatable()) {
+ removeJob(event.getJobId(), listener);
}
} else {
LOGGER.log(Level.SEVERE, "Entity not found for received
message for job " + event.getJobId());
@@ -75,6 +72,13 @@
LOGGER.log(Level.INFO, "Stopped " +
ActiveJobNotificationHandler.class.getSimpleName());
}
+ public void removeJob(JobId hyracksJobId, IActiveEntityEventsListener
listener) {
+ LOGGER.log(Level.FINER, "Removing the job");
+ jobId2ActiveJobInfos.remove(hyracksJobId);
+ LOGGER.log(Level.FINER, "Removing the listener since it is not active
anymore");
+ entityEventListeners.remove(listener.getEntityId());
+ }
+
public IActiveEntityEventsListener getActiveEntityListener(EntityId
entityId) {
if (DEBUG) {
LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId
entityId) was called with entity " + entityId);
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index ee8e776..7f3554a 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -41,6 +41,12 @@
ActivityState getState();
/**
+ * @return whether the listener is for a repeatable job
+ * Repeatable jobs won't delete listeners after job is finished
+ */
+ boolean isRepeatable();
+
+ /**
* get a subscriber that waits till state has been reached.
*
* @param state
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
new file mode 100644
index 0000000..9472ea7
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class ActiveEntityEventsListener implements
IActiveEntityEventsListener {
+
+ // members
+ protected EntityId entityId;
+ protected List<IDataset> datasets;
+ protected volatile ActivityState state;
+ protected JobId jobId;
+ protected boolean repeatableJob;
+
+ @Override
+ public EntityId getEntityId() {
+ return entityId;
+ }
+
+ @Override
+ public ActivityState getState() {
+ return state;
+ }
+
+ @Override
+ public boolean isEntityUsingDataset(IDataset dataset) {
+ return datasets.contains(dataset);
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return repeatableJob;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 2a87cab..820812a 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -27,7 +27,6 @@
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveEventSubscriber;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.metadata.IDataset;
@@ -36,20 +35,15 @@
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
-public class FeedEventsListener implements IActiveEntityEventsListener {
+public class FeedEventsListener extends ActiveEntityEventsListener {
// constants
private static final Logger LOGGER =
Logger.getLogger(FeedEventsListener.class.getName());
// members
- private final EntityId entityId;
- private final List<IDataset> datasets;
private final String[] sources;
private final List<IActiveEventSubscriber> subscribers;
- private volatile ActivityState state;
private int numRegistered;
- private JobId jobId;
public FeedEventsListener(EntityId entityId, List<IDataset> datasets,
String[] sources) {
this.entityId = entityId;
@@ -57,6 +51,7 @@
this.sources = sources;
subscribers = new ArrayList<>();
state = ActivityState.STOPPED;
+ repeatableJob = false;
}
@Override
@@ -119,16 +114,6 @@
}
@Override
- public EntityId getEntityId() {
- return entityId;
- }
-
- @Override
- public ActivityState getState() {
- return state;
- }
-
- @Override
public IActiveEventSubscriber subscribe(ActivityState state) throws
HyracksDataException {
if (state != ActivityState.STARTED && state != ActivityState.STOPPED) {
throw new HyracksDataException("Can only wait for STARTED or
STOPPED state");
@@ -148,11 +133,6 @@
FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state);
subscribers.add(subscriber);
return subscriber;
- }
-
- @Override
- public boolean isEntityUsingDataset(IDataset dataset) {
- return datasets.contains(dataset);
}
public String[] getSources() {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1524
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>