Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1524

Change subject: Enable Feed Changes to work with BAD project
......................................................................

Enable Feed Changes to work with BAD project

Extracts the ActiveListener
Enables listeners to survive after job executions

Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
4 files changed, 78 insertions(+), 27 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/24/1524/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index e4491bd..eba0fee 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -57,11 +57,8 @@
                     LOGGER.log(Level.FINER, "Next event is of type " + 
event.getEventKind());
                     LOGGER.log(Level.FINER, "Notifying the listener");
                     listener.notify(event);
-                    if (event.getEventKind() == Kind.JOB_FINISHED) {
-                        LOGGER.log(Level.FINER, "Removing the job");
-                        jobId2ActiveJobInfos.remove(event.getJobId());
-                        LOGGER.log(Level.FINER, "Removing the listener since 
it is not active anymore");
-                        entityEventListeners.remove(listener.getEntityId());
+                    if (event.getEventKind() == Kind.JOB_FINISHED && 
!listener.isRepeatable()) {
+                        removeJob(event.getJobId(), listener);
                     }
                 } else {
                     LOGGER.log(Level.SEVERE, "Entity not found for received 
message for job " + event.getJobId());
@@ -75,6 +72,13 @@
         LOGGER.log(Level.INFO, "Stopped " + 
ActiveJobNotificationHandler.class.getSimpleName());
     }
 
+    public void removeJob(JobId hyracksJobId, IActiveEntityEventsListener 
listener) {
+        LOGGER.log(Level.FINER, "Removing the job");
+        jobId2ActiveJobInfos.remove(hyracksJobId);
+        LOGGER.log(Level.FINER, "Removing the listener since it is not active 
anymore");
+        entityEventListeners.remove(listener.getEntityId());
+    }
+
     public IActiveEntityEventsListener getActiveEntityListener(EntityId 
entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId 
entityId) was called with entity " + entityId);
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index ee8e776..7f3554a 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -41,6 +41,12 @@
     ActivityState getState();
 
     /**
+     * @return whether the listener is for a repeatable job
+     * Repeatable jobs won't delete listeners after job is finished
+     */
+    boolean isRepeatable();
+
+    /**
      * get a subscriber that waits till state has been reached.
      *
      * @param state
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
new file mode 100644
index 0000000..9472ea7
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.List;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.hyracks.api.job.JobId;
+
+public abstract class ActiveEntityEventsListener implements 
IActiveEntityEventsListener {
+
+    // members
+    protected EntityId entityId;
+    protected List<IDataset> datasets;
+    protected volatile ActivityState state;
+    protected JobId jobId;
+    protected boolean repeatableJob;
+
+    @Override
+    public EntityId getEntityId() {
+        return entityId;
+    }
+
+    @Override
+    public ActivityState getState() {
+        return state;
+    }
+
+    @Override
+    public boolean isEntityUsingDataset(IDataset dataset) {
+        return datasets.contains(dataset);
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public boolean isRepeatable() {
+        return repeatableJob;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 2a87cab..820812a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveEventSubscriber;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.metadata.IDataset;
@@ -36,20 +35,15 @@
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 
-public class FeedEventsListener implements IActiveEntityEventsListener {
+public class FeedEventsListener extends ActiveEntityEventsListener {
     // constants
     private static final Logger LOGGER = 
Logger.getLogger(FeedEventsListener.class.getName());
     // members
-    private final EntityId entityId;
-    private final List<IDataset> datasets;
     private final String[] sources;
     private final List<IActiveEventSubscriber> subscribers;
-    private volatile ActivityState state;
     private int numRegistered;
-    private JobId jobId;
 
     public FeedEventsListener(EntityId entityId, List<IDataset> datasets, 
String[] sources) {
         this.entityId = entityId;
@@ -57,6 +51,7 @@
         this.sources = sources;
         subscribers = new ArrayList<>();
         state = ActivityState.STOPPED;
+        repeatableJob = false;
     }
 
     @Override
@@ -119,16 +114,6 @@
     }
 
     @Override
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    @Override
-    public ActivityState getState() {
-        return state;
-    }
-
-    @Override
     public IActiveEventSubscriber subscribe(ActivityState state) throws 
HyracksDataException {
         if (state != ActivityState.STARTED && state != ActivityState.STOPPED) {
             throw new HyracksDataException("Can only wait for STARTED or 
STOPPED state");
@@ -148,11 +133,6 @@
         FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state);
         subscribers.add(subscriber);
         return subscriber;
-    }
-
-    @Override
-    public boolean isEntityUsingDataset(IDataset dataset) {
-        return datasets.contains(dataset);
     }
 
     public String[] getSources() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1524
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib62184b67aff564475ef9b58790ff96409195b77
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>

Reply via email to