http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
new file mode 100644
index 0000000..92a7d18
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
+import 
org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
+import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
+import 
org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The PeriodicNotificationApplication runs the key components of the Periodic
+ * Query Service. It consists of a {@link KafkaNotificationProvider}, a
+ * {@link NotificationCoordinatorExecutor}, a
+ * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, and 
a
+ * {@link PeriodicQueryPrunerExecutor}. These services run in coordination with
+ * one another to perform the following tasks in the indicated order: <br>
+ * <li>Retrieve new requests to generate periodic notifications from Kafka
+ * <li>Register them with the {@link NotificationCoordinatorExecutor} to
+ * generate the periodic notifications
+ * <li>As notifications are generated, they are added to a work queue that is
+ * monitored by the {@link NotificationProcessorExecutor}.
+ * <li>The processor processes the notifications by reading all of the query
+ * results corresponding to the bin and query id indicated by the notification.
+ * <li>After reading the results, the processor adds a {@link BindingSetRecord}
+ * to a work queue monitored by the {@link KafkaExporterExecutor}.
+ * <li>The processor then adds a {@link NodeBin} to a workqueue monitored by 
the
+ * {@link BinPruner}
+ * <li>The exporter processes the BindingSetRecord by exporing the result to
+ * Kafka
+ * <li>The BinPruner processes the NodeBin by cleaning up the results for the
+ * indicated bin and query in Accumulo and Fluo. <br>
+ * <br>
+ * The purpose of this Periodic Query Service is to facilitate the ability to
+ * answer Periodic Queries using the Rya Fluo application, where a Periodic
+ * Query is any query requesting periodic updates about events that occurred
+ * within a given window of time of this instant. This is also known as a
+ * rolling window query. Period Queries can be expressed using SPARQL by
+ * including the {@link Function} indicated by the URI
+ * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this
+ * Function with the following arguments: the temporal variable in the query
+ * that will be filtered on, the window of time that events must occur within,
+ * the period at which the user wants to receive updates, and the time unit. 
The
+ * following query requests all observations that occurred within the last
+ * minute and requests updates every 15 seconds. It also performs a count on
+ * those observations. <br>
+ * <br>
+ * <li>prefix function: http://org.apache.rya/function#
+ * <li>"prefix time: http://www.w3.org/2006/time#
+ * <li>"select (count(?obs) as ?total) where {
+ * <li>"Filter(function:periodic(?time, 1, .25, time:minutes))
+ * <li>"?obs uri:hasTime ?time.
+ * <li>"?obs uri:hasId ?id }
+ * <li>
+ */
+public class PeriodicNotificationApplication implements LifeCycle {
+
+    private static final Logger log = 
Logger.getLogger(PeriodicNotificationApplication.class);
+    private NotificationCoordinatorExecutor coordinator;
+    private KafkaNotificationProvider provider;
+    private PeriodicQueryPrunerExecutor pruner;
+    private NotificationProcessorExecutor processor;
+    private KafkaExporterExecutor exporter;
+    private boolean running = false;
+
+    /**
+     * Creates a PeriodicNotificationApplication
+     * @param provider - {@link KafkaNotificationProvider} that retrieves new 
Notificaiton requests from Kafka
+     * @param coordinator - {NotificationCoordinator} that manages 
PeriodicNotifications.
+     * @param processor - {@link NotificationProcessorExecutor} that processes 
PeriodicNotifications
+     * @param exporter - {@link KafkaExporterExecutor} that exports periodic 
results
+     * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old 
periodic bins
+     */
+    public PeriodicNotificationApplication(KafkaNotificationProvider provider, 
NotificationCoordinatorExecutor coordinator,
+            NotificationProcessorExecutor processor, KafkaExporterExecutor 
exporter, PeriodicQueryPrunerExecutor pruner) {
+        this.provider = Preconditions.checkNotNull(provider);
+        this.coordinator = Preconditions.checkNotNull(coordinator);
+        this.processor = Preconditions.checkNotNull(processor);
+        this.exporter = Preconditions.checkNotNull(exporter);
+        this.pruner = Preconditions.checkNotNull(pruner);
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            log.info("Starting PeriodicNotificationApplication.");
+            coordinator.start();
+            provider.start();
+            processor.start();
+            pruner.start();
+            exporter.start();
+            running = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+        log.info("Stopping PeriodicNotificationApplication.");
+        provider.stop();
+        coordinator.stop();
+        processor.stop();
+        pruner.stop();
+        exporter.stop();
+        running = false;
+    }
+
+    /**
+     * @return boolean indicating whether the application is running
+     */
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private PeriodicQueryPrunerExecutor pruner;
+        private KafkaNotificationProvider provider;
+        private NotificationProcessorExecutor processor;
+        private KafkaExporterExecutor exporter;
+        private NotificationCoordinatorExecutor coordinator;
+
+        /**
+         * Sets the PeriodicQueryPrunerExecutor.
+         * @param pruner - PeriodicQueryPrunerExecutor for cleaning up old 
periodic bins
+         * @return this Builder for chaining method calls
+         */
+        public Builder setPruner(PeriodicQueryPrunerExecutor pruner) {
+            this.pruner = pruner;
+            return this;
+        }
+
+        /**
+         * Sets the KafkaNotificationProvider
+         * @param provider - KafkaNotificationProvider for retrieving new 
periodic notification requests from Kafka
+         * @return this Builder for chaining method calls
+         */
+        public Builder setProvider(KafkaNotificationProvider provider) {
+            this.provider = provider;
+            return this;
+        }
+
+        public Builder setProcessor(NotificationProcessorExecutor processor) {
+            this.processor = processor;
+            return this;
+        }
+
+        /**
+         * Sets KafkaExporterExecutor
+         * @param exporter for exporting periodic query results to Kafka
+         * @return this Builder for chaining method calls
+         */
+        public Builder setExporter(KafkaExporterExecutor exporter) {
+            this.exporter = exporter;
+            return this;
+        }
+
+        /**
+         * Sets NotificationCoordinatorExecutor
+         * @param coordinator for managing and generating periodic 
notifications
+         * @return this Builder for chaining method calls
+         */
+        public Builder setCoordinator(NotificationCoordinatorExecutor 
coordinator) {
+            this.coordinator = coordinator;
+            return this;
+        }
+
+        /**
+         * Creates a PeriodicNotificationApplication
+         * @return PeriodicNotificationApplication for periodically polling 
Rya Fluo Application
+         */
+        public PeriodicNotificationApplication build() {
+            return new PeriodicNotificationApplication(provider, coordinator, 
processor, exporter, pruner);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
new file mode 100644
index 0000000..d69efe5
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.util.Properties;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+
+import jline.internal.Preconditions;
+
+/**
+ * Configuration object for creating a {@link PeriodicNotificationApplication}.
+ */
+public class PeriodicNotificationApplicationConfiguration extends 
AccumuloRdfConfiguration {
+
+    public static String FLUO_APP_NAME = "fluo.app.name";
+    public static String FLUO_TABLE_NAME = "fluo.table.name";
+    public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
+    public static String NOTIFICATION_TOPIC = "kafka.notification.topic";
+    public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id";
+    public static String NOTIFICATION_CLIENT_ID = 
"kafka.notification.client.id";
+    public static String COORDINATOR_THREADS = "cep.coordinator.threads";
+    public static String PRODUCER_THREADS = "cep.producer.threads";
+    public static String EXPORTER_THREADS = "cep.exporter.threads";
+    public static String PROCESSOR_THREADS = "cep.processor.threads";
+    public static String PRUNER_THREADS = "cep.pruner.threads";
+    
+    public PeriodicNotificationApplicationConfiguration() {}
+    
+    /**
+     * Creates an PeriodicNotificationApplicationConfiguration object from a 
Properties file.  This method assumes
+     * that all values in the Properties file are Strings and that the 
Properties file uses the keys below.
+     * See 
rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties
 for an example.
+     * <br>
+     * <ul>
+     * <li>"accumulo.auths" - String of Accumulo authorizations. Default is 
empty String.
+     * <li>"accumulo.instance" - Accumulo instance name (required)
+     * <li>"accumulo.user" - Accumulo user (required)
+     * <li>"accumulo.password" - Accumulo password (required)
+     * <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance.  
Default is "rya_"
+     * <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance 
(required)
+     * <li>"fluo.app.name" - Name of Fluo Application (required)
+     * <li>"fluo.table.name" - Name of Fluo Table (required)
+     * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers 
and Consumers (required)
+     * <li>"kafka.notification.topic" - Topic to which new Periodic 
Notifications are published. Default is "notifications".
+     * <li>"kafka.notification.client.id" - Client Id for notification topic.  
Default is "consumer0"
+     * <li>"kafka.notification.group.id" - Group Id for notification topic.  
Default is "group0"
+     * <li>"cep.coordinator.threads" - Number of threads used by coordinator. 
Default is 1.
+     * <li>"cep.producer.threads" - Number of threads used by producer.  
Default is 1.
+     * <li>"cep.exporter.threads" - Number of threads used by exporter.  
Default is 1.
+     * <li>"cep.processor.threads" - Number of threads used by processor.  
Default is 1.
+     * <li>"cep.pruner.threads" - Number of threads used by pruner.  Default 
is 1.
+     * </ul>
+     * <br>
+     * @param props - Properties file containing Accumulo specific 
configuration parameters
+     * @return AccumumuloRdfConfiguration with properties set
+     */
+    public PeriodicNotificationApplicationConfiguration(Properties props) {
+       super(fromProperties(props));
+       setFluoAppName(props.getProperty(FLUO_APP_NAME));
+       setFluoTableName(props.getProperty(FLUO_TABLE_NAME));
+       setBootStrapServers(props.getProperty(KAFKA_BOOTSTRAP_SERVERS));
+       setNotificationClientId(props.getProperty(NOTIFICATION_CLIENT_ID, 
"consumer0"));
+       setNotificationTopic(props.getProperty(NOTIFICATION_TOPIC, 
"notifications"));
+       setNotificationGroupId(props.getProperty(NOTIFICATION_GROUP_ID, 
"group0"));
+       setProducerThreads(Integer.parseInt(props.getProperty(PRODUCER_THREADS, 
"1")));
+       
setProcessorThreads(Integer.parseInt(props.getProperty(PROCESSOR_THREADS, 
"1")));
+       setExporterThreads(Integer.parseInt(props.getProperty(EXPORTER_THREADS, 
"1")));
+       setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, 
"1")));
+       
setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, 
"1")));
+    }
+    
+    /**
+     * Sets the name of the Fluo Application
+     * @param fluoAppName 
+     */
+    public void setFluoAppName(String fluoAppName) {
+        set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName));
+    }
+    
+    /**
+     * Sets the name of the Fluo table
+     * @param fluoTableName
+     */
+    public void setFluoTableName(String fluoTableName) {
+       set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName)); 
+    }
+    
+    /**
+     * Sets the Kafka bootstrap servers
+     * @param bootStrapServers
+     */
+    public void setBootStrapServers(String bootStrapServers) {
+        set(KAFKA_BOOTSTRAP_SERVERS, 
Preconditions.checkNotNull(bootStrapServers)); 
+    }
+    
+    /**
+     * Sets the Kafka topic name for new notification requests
+     * @param notificationTopic
+     */
+    public void setNotificationTopic(String notificationTopic) {
+        set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic));
+    }
+    
+    /**
+     * Sets the GroupId for new notification request topic
+     * @param notificationGroupId
+     */
+    public void setNotificationGroupId(String notificationGroupId) {
+        set(NOTIFICATION_GROUP_ID, 
Preconditions.checkNotNull(notificationGroupId));
+    }
+    
+    /**
+     * Sets the ClientId for the Kafka notification topic
+     * @param notificationClientId
+     */
+    public void setNotificationClientId(String notificationClientId) {
+        set(NOTIFICATION_GROUP_ID, 
Preconditions.checkNotNull(notificationClientId));
+    }
+    
+    /**
+     * Sets the number of threads for the coordinator
+     * @param threads
+     */
+    public void setCoordinatorThreads(int threads) {
+        setInt(COORDINATOR_THREADS, threads);
+    }
+    
+    /**
+     * Sets the number of threads for the exporter
+     * @param threads
+     */
+    public void setExporterThreads(int threads) {
+        setInt(EXPORTER_THREADS, threads);
+    }
+    
+    /**
+     * Sets the number of threads for the producer for reading new periodic 
notifications
+     * @param threads
+     */
+    public void setProducerThreads(int threads) {
+        setInt(PRODUCER_THREADS, threads);
+    }
+    
+    /**
+     * Sets the number of threads for the bin pruner
+     * @param threads
+     */
+    public void setPrunerThreads(int threads) {
+        setInt(PRUNER_THREADS, threads);
+    }
+    
+    /**
+     * Sets the number of threads for the Notification processor
+     * @param threads
+     */
+    public void setProcessorThreads(int threads) {
+        setInt(PROCESSOR_THREADS, threads);
+    }
+    
+    /**
+     * @return name of the Fluo application
+     */
+    public String getFluoAppName() {
+        return get(FLUO_APP_NAME);
+    }
+    
+    /**
+     * @return name of the Fluo table
+     */
+    public String getFluoTableName() {
+       return get(FLUO_TABLE_NAME); 
+    }
+    
+    /**
+     * @return Kafka bootstrap servers
+     */
+    public String getBootStrapServers() {
+        return get(KAFKA_BOOTSTRAP_SERVERS); 
+    }
+    
+    /**
+     * @return notification topic
+     */
+    public String getNotificationTopic() {
+        return get(NOTIFICATION_TOPIC, "notifications");
+    }
+    
+    /**
+     * @return Kafka GroupId for the notificaton topic
+     */
+    public String getNotificationGroupId() {
+        return get(NOTIFICATION_GROUP_ID, "group0");
+    }
+    
+    /**
+     * @return Kafka ClientId for the notification topic
+     */
+    public String getNotificationClientId() {
+        return get(NOTIFICATION_CLIENT_ID, "consumer0");
+    }
+    
+    /**
+     * @return the number of threads for the coordinator
+     */
+    public int getCoordinatorThreads() {
+        return getInt(COORDINATOR_THREADS, 1);
+    }
+    
+    /**
+     * @return the number of threads for the exporter
+     */
+    public int getExporterThreads() {
+        return getInt(EXPORTER_THREADS, 1);
+    }
+    
+    /**
+     * @return the number of threads for the notification producer
+     */
+    public int getProducerThreads() {
+        return getInt(PRODUCER_THREADS, 1);
+    }
+    
+    /**
+     * @return the number of threads for the bin pruner
+     */
+    public int getPrunerThreads() {
+        return getInt(PRUNER_THREADS, 1);
+    }
+    
+    /**
+     * @return number of threads for the processor
+     */
+    public int getProcessorThreads() {
+        return getInt(PROCESSOR_THREADS, 1);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
new file mode 100644
index 0000000..771a4ab
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import 
org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
+import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
+import 
org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
+import 
org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import 
org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Factory for creating a {@link PeriodicNotificationApplication}.
+ */
+public class PeriodicNotificationApplicationFactory {
+
+    /**
+     * Create a PeriodicNotificationApplication.
+     * @param props - Properties file that specifies the parameters needed to 
create the application
+     * @return PeriodicNotificationApplication to periodically poll Rya Fluo 
for new results
+     * @throws PeriodicApplicationException
+     */
+    public static PeriodicNotificationApplication 
getPeriodicApplication(Properties props) throws PeriodicApplicationException {
+        PeriodicNotificationApplicationConfiguration conf = new 
PeriodicNotificationApplicationConfiguration(props);
+        Properties kafkaProps = getKafkaProperties(conf);
+
+        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
+        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+        BlockingQueue<BindingSetRecord> bindingSets = new 
LinkedBlockingQueue<>();
+
+        FluoClient fluo = null;
+        try {
+            PeriodicQueryResultStorage storage = 
getPeriodicQueryResultStorage(conf);
+            fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), 
Optional.of(conf.getFluoTableName()), conf);
+            NotificationCoordinatorExecutor coordinator = 
getCoordinator(conf.getCoordinatorThreads(), notifications);
+            addRegisteredNotices(coordinator, fluo.newSnapshot());
+            KafkaExporterExecutor exporter = 
getExporter(conf.getExporterThreads(), kafkaProps, bindingSets);
+            PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, 
conf.getPrunerThreads(), bins);
+            NotificationProcessorExecutor processor = getProcessor(storage, 
notifications, bins, bindingSets, conf.getProcessorThreads());
+            KafkaNotificationProvider provider = 
getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), 
coordinator, kafkaProps);
+            return 
PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter)
+                    .setProcessor(processor).setPruner(pruner).build();
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new PeriodicApplicationException(e.getMessage());
+        } 
+    }
+    
+    private static void addRegisteredNotices(NotificationCoordinatorExecutor 
coord, Snapshot sx) {
+        coord.start();
+        PeriodicNotificationProvider provider = new 
PeriodicNotificationProvider();
+        provider.processRegisteredNotifications(coord, sx);
+    }
+
+    private static NotificationCoordinatorExecutor getCoordinator(int 
numThreads, BlockingQueue<TimestampedNotification> notifications) {
+        return new PeriodicNotificationCoordinatorExecutor(numThreads, 
notifications);
+    }
+
+    private static KafkaExporterExecutor getExporter(int numThreads, 
Properties props, BlockingQueue<BindingSetRecord> bindingSets) {
+        KafkaProducer<String, BindingSet> producer = new 
KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe());
+        return new KafkaExporterExecutor(producer, numThreads, bindingSets);
+    }
+
+    private static PeriodicQueryPrunerExecutor 
getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads,
+            BlockingQueue<NodeBin> bins) {
+        return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, 
bins);
+    }
+
+    private static NotificationProcessorExecutor 
getProcessor(PeriodicQueryResultStorage periodicStorage,
+            BlockingQueue<TimestampedNotification> notifications, 
BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
+            int numThreads) {
+        return new NotificationProcessorExecutor(periodicStorage, 
notifications, bins, bindingSets, numThreads);
+    }
+
+    private static KafkaNotificationProvider getProvider(int numThreads, 
String topic, NotificationCoordinatorExecutor coord,
+            Properties props) {
+        return new KafkaNotificationProvider(topic, new StringDeserializer(), 
new CommandNotificationSerializer(), props, coord,
+                numThreads);
+    }
+
+    private static PeriodicQueryResultStorage 
getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf)
+            throws AccumuloException, AccumuloSecurityException {
+        Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), 
conf.getAccumuloZookeepers());
+        Connector conn = instance.getConnector(conf.getAccumuloUser(), new 
PasswordToken(conf.getAccumuloPassword()));
+        String ryaInstance = conf.getTablePrefix();
+        return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance);
+    }
+    
+    private static Properties 
getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
+        Properties kafkaProps = new Properties();
+        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.getBootStrapServers());
+        kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
conf.getNotificationClientId());
+        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
conf.getNotificationGroupId());
+        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return kafkaProps;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
new file mode 100644
index 0000000..0486244
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.coordinator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rya.periodic.notification.api.Notification;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.api.NotificationProcessor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import 
org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link NotificationCoordinatorExecutor} that generates 
regular notifications
+ * as indicated by {@link PeriodicNotification}s that are registered with this 
Object. When notifications
+ * are generated they are placed on a work queue to be processed by the {@link 
NotificationProcessor}.
+ *
+ */
+public class PeriodicNotificationCoordinatorExecutor implements 
NotificationCoordinatorExecutor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicNotificationCoordinatorExecutor.class);
+    private int numThreads;
+    private ScheduledExecutorService producerThreadPool;
+    private Map<String, ScheduledFuture<?>> serviceMap = new HashMap<>();
+    private BlockingQueue<TimestampedNotification> notifications;
+    private final ReentrantLock lock = new ReentrantLock(true);
+    private boolean running = false;
+
+    public PeriodicNotificationCoordinatorExecutor(int numThreads, 
BlockingQueue<TimestampedNotification> notifications) {
+        this.numThreads = numThreads;
+        this.notifications = notifications;
+    }
+
+    @Override
+    public void processNextCommandNotification(CommandNotification 
notification) {
+        lock.lock();
+        try {
+            processNotification(notification);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            producerThreadPool = Executors.newScheduledThreadPool(numThreads);
+            running = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+
+        if (producerThreadPool != null) {
+            producerThreadPool.shutdown();
+        }
+
+        running = false;
+
+        try {
+            if (!producerThreadPool.awaitTermination(5000, 
TimeUnit.MILLISECONDS)) {
+                producerThreadPool.shutdownNow();
+            }
+        } catch (Exception e) {
+            LOG.info("Service Executor Shutdown has been called.  Terminating 
NotificationRunnable");
+        }
+    }
+
+    private void processNotification(CommandNotification notification) {
+        Command command = notification.getCommand();
+        Notification periodic = notification.getNotification();
+        switch (command) {
+        case ADD:
+            addNotification(periodic);
+            break;
+        case DELETE:
+            deleteNotification(periodic);
+            break;
+        }
+    }
+
+    private void addNotification(Notification notification) {
+        Preconditions.checkArgument(notification instanceof 
PeriodicNotification);
+        PeriodicNotification notify = (PeriodicNotification) notification;
+        if (!serviceMap.containsKey(notification.getId())) {
+            ScheduledFuture<?> future = 
producerThreadPool.scheduleAtFixedRate(new NotificationProducer(notify), 
notify.getInitialDelay(),
+                    notify.getPeriod(), notify.getTimeUnit());
+            serviceMap.put(notify.getId(), future);
+        }
+    }
+
+    private boolean deleteNotification(Notification notification) {
+        if (serviceMap.containsKey(notification.getId())) {
+            ScheduledFuture<?> future = 
serviceMap.remove(notification.getId());
+            future.cancel(true);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Scheduled Task that places a {@link PeriodicNotification}
+     * in the work queue at regular intervals. 
+     *
+     */
+    class NotificationProducer implements Runnable {
+
+        private PeriodicNotification notification;
+
+        public NotificationProducer(PeriodicNotification notification) {
+            this.notification = notification;
+        }
+
+        public void run() {
+            try {
+                notifications.put(new TimestampedNotification(notification));
+            } catch (InterruptedException e) {
+                LOG.info("Unable to add notification.  Process interrupted. ");
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
new file mode 100644
index 0000000..c2e5ebf
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.log4j.Logger;
+import org.apache.rya.periodic.notification.api.BindingSetExporter;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.openrdf.query.BindingSet;
+
+import jline.internal.Preconditions;
+
+/**
+ * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s.  
+ *
+ */
+public class KafkaExporterExecutor implements LifeCycle {
+
+    private static final Logger log = 
Logger.getLogger(BindingSetExporter.class);
+    private KafkaProducer<String, BindingSet> producer;
+    private BlockingQueue<BindingSetRecord> bindingSets;
+    private ExecutorService executor;
+    private List<KafkaPeriodicBindingSetExporter> exporters;
+    private int num_Threads;
+    private boolean running = false;
+
+    /**
+     * Creates a KafkaExporterExecutor for exporting periodic query results to 
Kafka.
+     * @param producer for publishing results to Kafka
+     * @param num_Threads number of threads used to publish results
+     * @param bindingSets - work queue containing {@link BindingSet}s to be 
published
+     */
+    public KafkaExporterExecutor(KafkaProducer<String, BindingSet> producer, 
int num_Threads, BlockingQueue<BindingSetRecord> bindingSets) {
+        Preconditions.checkNotNull(producer);
+        Preconditions.checkNotNull(bindingSets);
+        this.producer = producer;
+        this.bindingSets = bindingSets;
+        this.num_Threads = num_Threads;
+        this.exporters = new ArrayList<>();
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            executor = Executors.newFixedThreadPool(num_Threads);
+
+            for (int threadNumber = 0; threadNumber < num_Threads; 
threadNumber++) {
+                log.info("Creating exporter:" + threadNumber);
+                KafkaPeriodicBindingSetExporter exporter = new 
KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets);
+                exporters.add(exporter);
+                executor.submit(exporter);
+            }
+            running = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (executor != null) {
+            executor.shutdown();
+        }
+
+        if (exporters != null && exporters.size() > 0) {
+            exporters.forEach(x -> x.shutdown());
+        }
+
+        if (producer != null) {
+            producer.close();
+        }
+
+        running = false;
+        try {
+            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                log.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            log.info("Interrupted during shutdown, exiting uncleanly");
+        }
+    }
+
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
new file mode 100644
index 0000000..8a0322f
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.periodic.notification.api.BindingSetExporter;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import 
org.apache.rya.periodic.notification.api.BindingSetRecordExportException;
+import org.openrdf.model.Literal;
+import org.openrdf.query.BindingSet;
+
+import jline.internal.Preconditions;
+
+/**
+ * Object that exports {@link BindingSet}s to the Kafka topic indicated by
+ * the {@link BindingSetRecord}.
+ * 
+ */
+public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, 
Runnable {
+
+    private static final Logger log = 
Logger.getLogger(BindingSetExporter.class);
+    private KafkaProducer<String, BindingSet> producer;
+    private BlockingQueue<BindingSetRecord> bindingSets;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+    private int threadNumber;
+
+    public KafkaPeriodicBindingSetExporter(KafkaProducer<String, BindingSet> 
producer, int threadNumber,
+            BlockingQueue<BindingSetRecord> bindingSets) {
+        Preconditions.checkNotNull(producer);
+        Preconditions.checkNotNull(bindingSets);
+        this.threadNumber = threadNumber;
+        this.producer = producer;
+        this.bindingSets = bindingSets;
+    }
+
+    /**
+     * Exports BindingSets to Kafka.  The BindingSet and topic are extracted 
from
+     * the indicated BindingSetRecord and the BindingSet is then exported to 
the topic.
+     */
+    @Override
+    public void exportNotification(BindingSetRecord record) throws 
BindingSetRecordExportException {
+        String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID;
+        BindingSet bindingSet = record.getBindingSet();
+        String topic = record.getTopic();
+        long binId = ((Literal) bindingSet.getValue(bindingName)).longValue();
+        final Future<RecordMetadata> future = producer
+                .send(new ProducerRecord<String, BindingSet>(topic, 
Long.toString(binId), bindingSet));
+        try {
+            //wait for confirmation that results have been received
+            future.get(5, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            throw new BindingSetRecordExportException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (!closed.get()) {
+                exportNotification(bindingSets.take());
+            }
+        } catch (InterruptedException | BindingSetRecordExportException e) {
+            log.trace("Thread " + threadNumber + " is unable to process 
message.");
+        }
+    }
+    
+    
+    public void shutdown() {
+        closed.set(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
new file mode 100644
index 0000000..a9a5ad1
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
@@ -0,0 +1,114 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.periodic.notification.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Executor service that runs {@link TimestampedNotificationProcessor}s with 
basic
+ * functionality for starting, stopping, and determining whether notification 
processors are
+ * being executed. 
+ *
+ */
+public class NotificationProcessorExecutor implements LifeCycle {
+
+    private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
+    private BlockingQueue<TimestampedNotification> notifications; // 
notifications
+    private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
+    private BlockingQueue<BindingSetRecord> bindingSets; // query results to
+                                                         // export
+    private PeriodicQueryResultStorage periodicStorage;
+    private List<TimestampedNotificationProcessor> processors;
+    private int numberThreads;
+    private ExecutorService executor;
+    private boolean running = false;
+
+    /**
+     * Creates NotificationProcessorExecutor.
+     * @param periodicStorage - storage layer that periodic results are read 
from
+     * @param notifications - notifications are pulled from this queue, and 
the timestamp indicates which bin of results to query for
+     * @param bins - after notifications are processed, they are added to the 
bin to be deleted
+     * @param bindingSets - results read from the storage layer to be exported
+     * @param numberThreads - number of threads used for processing
+     */
+    public NotificationProcessorExecutor(PeriodicQueryResultStorage 
periodicStorage, BlockingQueue<TimestampedNotification> notifications,
+            BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> 
bindingSets, int numberThreads) {
+        this.notifications = Preconditions.checkNotNull(notifications);
+        this.bins = Preconditions.checkNotNull(bins);
+        this.bindingSets = Preconditions.checkNotNull(bindingSets);
+        this.periodicStorage = periodicStorage;
+        this.numberThreads = numberThreads;
+        processors = new ArrayList<>();
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            executor = Executors.newFixedThreadPool(numberThreads);
+            for (int threadNumber = 0; threadNumber < numberThreads; 
threadNumber++) {
+                log.info("Creating exporter:" + threadNumber);
+                TimestampedNotificationProcessor processor = 
TimestampedNotificationProcessor.builder().setBindingSets(bindingSets)
+                        
.setBins(bins).setPeriodicStorage(periodicStorage).setNotifications(notifications).setThreadNumber(threadNumber)
+                        .build();
+                processors.add(processor);
+                executor.submit(processor);
+            }
+            running = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (processors != null && processors.size() > 0) {
+            processors.forEach(x -> x.shutdown());
+        }
+        if (executor != null) {
+            executor.shutdown();
+        }
+        running = false;
+        try {
+            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                log.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            log.info("Interrupted during shutdown, exiting uncleanly");
+        }
+    }
+
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
new file mode 100644
index 0000000..8b65683
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.processor;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.api.NotificationProcessor;
+import 
org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link NotificationProcessor} that uses the id indicated 
by
+ * the {@link TimestampedNotification} to obtain results from the
+ * {@link PeriodicQueryResultStorage} layer containing the results of the
+ * Periodic Query. The TimestampedNotificationProcessor then parses the results
+ * and adds them to work queues to be processed by the {@link BinPruner} and 
the
+ * {@link KafkaPeriodicBindingSetExporter}.
+ *
+ */
+public class TimestampedNotificationProcessor implements 
NotificationProcessor, Runnable {
+
+    private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
+    private PeriodicQueryResultStorage periodicStorage;
+    private BlockingQueue<TimestampedNotification> notifications; // 
notifications
+                                                                  // to process
+    private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
+    private BlockingQueue<BindingSetRecord> bindingSets; // query results to 
export
+    private AtomicBoolean closed = new AtomicBoolean(false);
+    private int threadNumber;
+    
+
+    public TimestampedNotificationProcessor(PeriodicQueryResultStorage 
periodicStorage,
+            BlockingQueue<TimestampedNotification> notifications, 
BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
+            int threadNumber) {
+        this.notifications = Preconditions.checkNotNull(notifications);
+        this.bins = Preconditions.checkNotNull(bins);
+        this.bindingSets = Preconditions.checkNotNull(bindingSets);
+        this.periodicStorage = periodicStorage;
+        this.threadNumber = threadNumber;
+    }
+
+    /**
+     * Processes the TimestampNotifications by scanning the PCJ tables for
+     * entries in the bin corresponding to
+     * {@link TimestampedNotification#getTimestamp()} and adding them to the
+     * export BlockingQueue. The TimestampNotification is then used to form a
+     * {@link NodeBin} that is passed to the BinPruner BlockingQueue so that 
the
+     * bins can be deleted from Fluo and Accumulo.
+     */
+    @Override
+    public void processNotification(TimestampedNotification notification) {
+
+        String id = notification.getId();
+        long ts = notification.getTimestamp().getTime();
+        long period = notification.getPeriod();
+        long bin = getBinFromTimestamp(ts, period);
+        NodeBin nodeBin = new NodeBin(id, bin);
+
+        try (CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(id, Optional.of(bin));) {
+
+            while(iter.hasNext()) {
+                bindingSets.add(new BindingSetRecord(iter.next(), id));
+            }
+            // add NodeBin to BinPruner queue so that bin can be deleted from
+            // Fluo and Accumulo
+            bins.add(nodeBin);
+        } catch (Exception e) {
+            log.debug("Encountered error: " + e.getMessage() + " while 
accessing periodic results for bin: " + bin + " for query: " + id);
+        }
+    }
+
+    /**
+     * Computes left bin end point containing event time ts
+     * 
+     * @param ts - event time
+     * @param start - time that periodic event began
+     * @param period - length of period
+     * @return left bin end point containing event time ts
+     */
+    private long getBinFromTimestamp(long ts, long period) {
+        Preconditions.checkArgument(period > 0);
+        return (ts / period) * period;
+    }
+
+    @Override
+    public void run() {
+        try {
+            while(!closed.get()) {
+                processNotification(notifications.take());
+            }
+        } catch (Exception e) {
+            log.trace("Thread_" + threadNumber + " is unable to process next 
notification.");
+            throw new RuntimeException(e);
+        }
+
+    }
+    
+    public void shutdown() {
+        closed.set(true);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+  
+
+    public static class Builder {
+
+        private PeriodicQueryResultStorage periodicStorage;
+        private BlockingQueue<TimestampedNotification> notifications; // 
notifications to process
+        private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
+        private BlockingQueue<BindingSetRecord> bindingSets; // query results 
to export
+                                                       
+        private int threadNumber;
+
+        /**
+         * Set notification queue
+         * @param notifications - work queue containing notifications to be 
processed
+         * @return this Builder for chaining method calls
+         */
+        public Builder setNotifications(BlockingQueue<TimestampedNotification> 
notifications) {
+            this.notifications = notifications;
+            return this;
+        }
+
+        /**
+         * Set nodeBin queue
+         * @param bins - work queue containing NodeBins to be pruned
+         * @return this Builder for chaining method calls
+         */
+        public Builder setBins(BlockingQueue<NodeBin> bins) {
+            this.bins = bins;
+            return this;
+        }
+
+        /**
+         * Set BindingSet queue
+         * @param bindingSets - work queue containing BindingSets to be 
exported
+         * @return this Builder for chaining method calls
+         */
+        public Builder setBindingSets(BlockingQueue<BindingSetRecord> 
bindingSets) {
+            this.bindingSets = bindingSets;
+            return this;
+        }
+
+        /**
+         * Sets the number of threads used by this processor
+         * @param threadNumber - number of threads used by this processor
+         * @return - number of threads used by this processor
+         */
+        public Builder setThreadNumber(int threadNumber) {
+            this.threadNumber = threadNumber;
+            return this;
+        }
+        
+        /**
+         * Set the PeriodicStorage layer
+         * @param periodicStorage - periodic storage layer that periodic 
results are read from
+         * @return - this Builder for chaining method calls
+         */
+        public Builder setPeriodicStorage(PeriodicQueryResultStorage 
periodicStorage) {
+            this.periodicStorage = periodicStorage;
+            return this;
+        }
+
+        /**
+         * Builds a TimestampedNotificationProcessor
+         * @return - TimestampedNotificationProcessor built from arguments 
passed to this Builder
+         */
+        public TimestampedNotificationProcessor build() {
+            return new TimestampedNotificationProcessor(periodicStorage, 
notifications, bins, bindingSets, threadNumber);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
new file mode 100644
index 0000000..4dac64c
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import jline.internal.Preconditions;
+
+/**
+ * Deletes BindingSets from time bins in the indicated PCJ table
+ */
+public class AccumuloBinPruner implements BinPruner {
+
+    private static final Logger log = 
Logger.getLogger(AccumuloBinPruner.class);
+    private PeriodicQueryResultStorage periodicStorage;
+
+    public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) {
+        Preconditions.checkNotNull(periodicStorage);
+        this.periodicStorage = periodicStorage;
+    }
+
+    /**
+     * This method deletes all BindingSets in the indicated bin from the PCJ
+     * table indicated by the id. It is assumed that all BindingSet entries for
+     * the corresponding bin are written to the PCJ table so that the bin Id
+     * occurs first.
+     * 
+     * @param id
+     *            - pcj table id
+     * @param bin
+     *            - temporal bin the BindingSets are contained in
+     */
+    @Override
+    public void pruneBindingSetBin(NodeBin nodeBin) {
+        Preconditions.checkNotNull(nodeBin);
+        String id = nodeBin.getNodeId();
+        long bin = nodeBin.getBin();
+        try {
+            periodicStorage.deletePeriodicQueryResults(id, bin);
+        } catch (PeriodicQueryStorageException e) {
+            log.trace("Unable to delete results from Peroidic Table: " + id + 
" for bin: " + bin);
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
new file mode 100644
index 0000000..bee9c02
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import com.google.common.base.Optional;
+
+/**
+ * Deletes {@link BindingSet}s from the indicated Fluo table.
+ */
+public class FluoBinPruner implements BinPruner {
+
+    private static final Logger log = Logger.getLogger(FluoBinPruner.class);
+    private FluoClient client;
+
+    public FluoBinPruner(FluoClient client) {
+        this.client = client;
+    }
+
+    /**
+     * This method deletes BindingSets in the specified bin from the BindingSet
+     * Column of the indicated Fluo nodeId
+     * 
+     * @param id
+     *            - Fluo nodeId
+     * @param bin
+     *            - bin id
+     */
+    @Override
+    public void pruneBindingSetBin(NodeBin nodeBin) {
+        String id = nodeBin.getNodeId();
+        long bin = nodeBin.getBin();
+        try (Transaction tx = client.newTransaction()) {
+            Optional<NodeType> type = NodeType.fromNodeId(id);
+            if (!type.isPresent()) {
+                log.trace("Unable to determine NodeType from id: " + id);
+                throw new RuntimeException();
+            }
+            Column batchInfoColumn = type.get().getResultColumn();
+            String batchInfoSpanPrefix = id + 
IncrementalUpdateConstants.NODEID_BS_DELIM + bin;
+            SpanBatchDeleteInformation batchInfo = 
SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn)
+                    
.setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build();
+            BatchInformationDAO.addBatch(tx, id, batchInfo);
+            tx.commit();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
new file mode 100644
index 0000000..516690e
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import jline.internal.Preconditions;
+
+/**
+ * Implementation of {@link BinPruner} that deletes old, already processed
+ * Periodic Query results from Fluo and the PCJ table to which the Fluo results
+ * are exported.
+ *
+ */
+public class PeriodicQueryPruner implements BinPruner, Runnable {
+
+    private static final Logger log = 
Logger.getLogger(PeriodicQueryPruner.class);
+    private FluoClient client;
+    private AccumuloBinPruner accPruner;
+    private FluoBinPruner fluoPruner;
+    private BlockingQueue<NodeBin> bins;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+    private int threadNumber;
+
+    public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner 
accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) {
+        this.fluoPruner = Preconditions.checkNotNull(fluoPruner);
+        this.accPruner = Preconditions.checkNotNull(accPruner);
+        this.client = Preconditions.checkNotNull(client);
+        this.bins = Preconditions.checkNotNull(bins);
+        this.threadNumber = threadNumber;
+    }
+    
+    @Override
+    public void run() {
+        try {
+            while (!closed.get()) {
+                pruneBindingSetBin(bins.take());
+            }
+        } catch (InterruptedException e) {
+            log.trace("Thread " + threadNumber + " is unable to prune the next 
message.");
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * Prunes BindingSet bins from the Rya Fluo Application in addition to the 
BindingSet
+     * bins created in the PCJ tables associated with the give query id.
+     * @param id - QueryResult Id for the Rya Fluo application 
+     * @param bin - bin id for bins to be deleted
+     */
+    @Override
+    public void pruneBindingSetBin(NodeBin nodeBin) {
+        String pcjId = nodeBin.getNodeId();
+        long bin = nodeBin.getBin();
+        try(Snapshot sx = client.newSnapshot()) {
+            String queryId = NodeType.generateNewIdForType(NodeType.QUERY, 
pcjId);
+            Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId);
+            accPruner.pruneBindingSetBin(nodeBin);
+            for(String fluoId: fluoIds) {
+                fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin));
+            }
+        } catch (Exception e) {
+            log.trace("Could not successfully initialize 
PeriodicQueryBinPruner.");
+        }
+    }
+    
+    
+    public void shutdown() {
+        closed.set(true);
+    }
+
+    private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) {
+        Set<String> ids = new HashSet<>();
+        PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids);
+        return ids;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
new file mode 100644
index 0000000..1c11f96
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Executor service that runs {@link PeriodicQueryPruner}s with added 
functionality
+ * for starting, stopping, and determining if the query pruners are running.
+ */
+public class PeriodicQueryPrunerExecutor implements LifeCycle {
+
+    private static final Logger log = 
Logger.getLogger(PeriodicQueryPrunerExecutor.class);
+    private FluoClient client;
+    private int numThreads;
+    private ExecutorService executor;
+    private BlockingQueue<NodeBin> bins;
+    private PeriodicQueryResultStorage periodicStorage;
+    private List<PeriodicQueryPruner> pruners;
+    private boolean running = false;
+
+    public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage 
periodicStorage, FluoClient client, int numThreads,
+            BlockingQueue<NodeBin> bins) {
+        Preconditions.checkArgument(numThreads > 0);
+        this.periodicStorage = periodicStorage;
+        this.numThreads = numThreads;
+        executor = Executors.newFixedThreadPool(numThreads);
+        this.bins = bins;
+        this.client = client;
+        this.pruners = new ArrayList<>();
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            AccumuloBinPruner accPruner = new 
AccumuloBinPruner(periodicStorage);
+            FluoBinPruner fluoPruner = new FluoBinPruner(client);
+
+            for (int threadNumber = 0; threadNumber < numThreads; 
threadNumber++) {
+                PeriodicQueryPruner pruner = new 
PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber);
+                pruners.add(pruner);
+                executor.submit(pruner);
+            }
+            running = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (pruners != null && pruners.size() > 0) {
+            pruners.forEach(x -> x.shutdown());
+        }
+        if(client != null) {
+            client.close();
+        }
+        if (executor != null) {
+            executor.shutdown();
+            running = false;
+        }
+        try {
+            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                log.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            log.info("Interrupted during shutdown, exiting uncleanly");
+        }
+    }
+
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
new file mode 100644
index 0000000..69bd39c
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.recovery;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import 
org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * This class is used by the {@link PeriodicNotificationCoordinatorExecutor}
+ * to add all existing {@link PeriodicNotification}s stored in Fluo when it is
+ * initialized.  This enables the the {@link PeriodicServiceApplication} to be 
+ * recovered from failure by restoring it original state.
+ *
+ */
+public class PeriodicNotificationProvider {
+
+    private FluoQueryMetadataDAO dao;
+    
+    public PeriodicNotificationProvider() {
+        this.dao = new FluoQueryMetadataDAO();
+    }
+    
+    /**
+     * Retrieve all of the information about Periodic Query results already 
registered
+     * with Fluo.  This is returned in the form of {@link 
CommandNotification}s that
+     * can be registered with the {@link NotificationCoordinatorExecutor}.
+     * @param sx - snapshot for reading results from Fluo
+     * @return - collection of CommandNotifications that indicate Periodic 
Query information registered with system
+     */
+    public Collection<CommandNotification> getNotifications(Snapshot sx) {
+        Set<PeriodicQueryMetadata> periodicMetadata = new HashSet<>();
+        RowScanner scanner = 
sx.scanner().fetch(FluoQueryColumns.PERIODIC_QUERY_NODE_ID)
+                
.over(Span.prefix(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX)).byRow().build();
+        Iterator<ColumnScanner> colScannerIter = scanner.iterator();
+        while (colScannerIter.hasNext()) {
+            ColumnScanner colScanner = colScannerIter.next();
+            Iterator<ColumnValue> values = colScanner.iterator();
+            while (values.hasNext()) {
+                PeriodicQueryMetadata metadata = 
dao.readPeriodicQueryMetadata(sx, values.next().getsValue());
+                periodicMetadata.add(metadata);
+            }
+        }
+        return getCommandNotifications(sx, periodicMetadata);
+    }
+    
+    /**
+     * Registers all of Periodic Query information already contained within 
Fluo to the 
+     * {@link NotificationCoordinatorExecutor}.
+     * @param coordinator - coordinator that periodic info will be registered 
with
+     * @param sx - snapshot for reading results from Fluo
+     */
+    public void processRegisteredNotifications(NotificationCoordinatorExecutor 
coordinator, Snapshot sx) {
+        coordinator.start();
+        Collection<CommandNotification> notifications = getNotifications(sx);
+        for(CommandNotification notification: notifications) {
+            coordinator.processNextCommandNotification(notification);
+        }
+    }
+    
+    private Collection<CommandNotification> getCommandNotifications(Snapshot 
sx, Collection<PeriodicQueryMetadata> metadata) {
+        Set<CommandNotification> notifications = new HashSet<>();
+        int i = 1;
+        for(PeriodicQueryMetadata meta:metadata) {
+            //offset initial wait to avoid overloading system
+            PeriodicNotification periodic = new 
PeriodicNotification(getQueryId(meta.getNodeId(), sx), 
meta.getPeriod(),TimeUnit.MILLISECONDS,i*5000);
+            notifications.add(new CommandNotification(Command.ADD, periodic));
+            i++;
+        }
+        return notifications;
+    }
+    
+    private String getQueryId(String periodicNodeId, Snapshot sx) {
+        return getQueryIdFromPeriodicId(sx, periodicNodeId);
+    }
+    
+    private String getQueryIdFromPeriodicId(Snapshot sx, String nodeId) {
+        NodeType nodeType = NodeType.fromNodeId(nodeId).orNull();
+        String id = null;
+        switch (nodeType) {
+        case FILTER:
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.FILTER_PARENT_NODE_ID).toString());
+            break;
+        case PERIODIC_QUERY:
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString());
+            break;
+        case QUERY:
+            id = FluoQueryUtils.convertFluoQueryIdToPcjId(nodeId);
+            break;
+        case AGGREGATION: 
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString());
+            break;
+        case CONSTRUCT:
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString());
+            break;
+        case PROJECTION:
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.PROJECTION_PARENT_NODE_ID).toString());
+            break;
+        default:
+            throw new IllegalArgumentException("Invalid node type");
+        
+        }
+        return id;
+    }
+    
+}

Reply via email to