http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
index 3b639e9..815a794 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java
@@ -20,57 +20,54 @@ package org.apache.rya.periodic.notification.exporter;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.log4j.Logger;
-import org.apache.rya.periodic.notification.api.BindingSetExporter;
 import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import org.apache.rya.periodic.notification.api.LifeCycle;
 import org.openrdf.query.BindingSet;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s.  
+ * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s.
  *
  */
 public class KafkaExporterExecutor implements LifeCycle {
 
-    private static final Logger log = 
Logger.getLogger(BindingSetExporter.class);
-    private KafkaProducer<String, BindingSet> producer;
-    private BlockingQueue<BindingSetRecord> bindingSets;
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaExporterExecutor.class);
+    private final KafkaProducer<String, BindingSet> producer;
+    private final BlockingQueue<BindingSetRecord> bindingSets;
     private ExecutorService executor;
-    private List<KafkaPeriodicBindingSetExporter> exporters;
-    private int num_Threads;
+    private final List<KafkaPeriodicBindingSetExporter> exporters;
+    private final int numThreads;
     private boolean running = false;
 
     /**
      * Creates a KafkaExporterExecutor for exporting periodic query results to 
Kafka.
      * @param producer for publishing results to Kafka
-     * @param num_Threads number of threads used to publish results
+     * @param numThreads number of threads used to publish results
      * @param bindingSets - work queue containing {@link BindingSet}s to be 
published
      */
-    public KafkaExporterExecutor(KafkaProducer<String, BindingSet> producer, 
int num_Threads, BlockingQueue<BindingSetRecord> bindingSets) {
-        Preconditions.checkNotNull(producer);
-        Preconditions.checkNotNull(bindingSets);
-        this.producer = producer;
-        this.bindingSets = bindingSets;
-        this.num_Threads = num_Threads;
+    public KafkaExporterExecutor(final KafkaProducer<String, BindingSet> 
producer, final int numThreads, final BlockingQueue<BindingSetRecord> 
bindingSets) {
+        this.producer = Objects.requireNonNull(producer);
+        this.bindingSets = Objects.requireNonNull(bindingSets);
+        this.numThreads = numThreads;
         this.exporters = new ArrayList<>();
     }
 
     @Override
     public void start() {
         if (!running) {
-            executor = Executors.newFixedThreadPool(num_Threads);
+            executor = Executors.newFixedThreadPool(numThreads);
 
-            for (int threadNumber = 0; threadNumber < num_Threads; 
threadNumber++) {
-                log.info("Creating exporter:" + threadNumber);
-                KafkaPeriodicBindingSetExporter exporter = new 
KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets);
+            for (int threadNumber = 0; threadNumber < numThreads; 
threadNumber++) {
+                log.info("Creating exporter: {}", threadNumber);
+                final KafkaPeriodicBindingSetExporter exporter = new 
KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets);
                 exporters.add(exporter);
                 executor.submit(exporter);
             }
@@ -98,7 +95,7 @@ public class KafkaExporterExecutor implements LifeCycle {
                 log.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
                 executor.shutdownNow();
             }
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             log.info("Interrupted during shutdown, exiting uncleanly");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
index 5397618..c343116 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java
@@ -18,46 +18,42 @@
  */
 package org.apache.rya.periodic.notification.exporter;
 
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.periodic.notification.api.BindingSetExporter;
 import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import 
org.apache.rya.periodic.notification.api.BindingSetRecordExportException;
 import org.openrdf.model.Literal;
 import org.openrdf.query.BindingSet;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Object that exports {@link BindingSet}s to the Kafka topic indicated by
  * the {@link BindingSetRecord}.
- * 
+ *
  */
 public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, 
Runnable {
 
-    private static final Logger log = 
Logger.getLogger(BindingSetExporter.class);
-    private KafkaProducer<String, BindingSet> producer;
-    private BlockingQueue<BindingSetRecord> bindingSets;
-    private AtomicBoolean closed = new AtomicBoolean(false);
-    private int threadNumber;
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaPeriodicBindingSetExporter.class);
+    private final KafkaProducer<String, BindingSet> producer;
+    private final BlockingQueue<BindingSetRecord> bindingSets;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final int threadNumber;
 
-    public KafkaPeriodicBindingSetExporter(KafkaProducer<String, BindingSet> 
producer, int threadNumber,
-            BlockingQueue<BindingSetRecord> bindingSets) {
-        Preconditions.checkNotNull(producer);
-        Preconditions.checkNotNull(bindingSets);
+    public KafkaPeriodicBindingSetExporter(final KafkaProducer<String, 
BindingSet> producer, final int threadNumber,
+            final BlockingQueue<BindingSetRecord> bindingSets) {
         this.threadNumber = threadNumber;
-        this.producer = producer;
-        this.bindingSets = bindingSets;
+        this.producer = Objects.requireNonNull(producer);
+        this.bindingSets = Objects.requireNonNull(bindingSets);
     }
 
     /**
@@ -65,18 +61,21 @@ public class KafkaPeriodicBindingSetExporter implements 
BindingSetExporter, Runn
      * the indicated BindingSetRecord and the BindingSet is then exported to 
the topic.
      */
     @Override
-    public void exportNotification(BindingSetRecord record) throws 
BindingSetRecordExportException {
-        String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID;
-        BindingSet bindingSet = record.getBindingSet();
-        String topic = record.getTopic();
-        long binId = ((Literal) bindingSet.getValue(bindingName)).longValue();
-        final Future<RecordMetadata> future = producer
-                .send(new ProducerRecord<String, BindingSet>(topic, 
Long.toString(binId), bindingSet));
+    public void exportNotification(final BindingSetRecord record) throws 
BindingSetRecordExportException {
         try {
+            log.info("Exporting {} records to Kafka to topic: {}", 
record.getBindingSet().size(), record.getTopic());
+            final String bindingName = 
IncrementalUpdateConstants.PERIODIC_BIN_ID;
+
+            final BindingSet bindingSet = record.getBindingSet();
+            final String topic = record.getTopic();
+            final long binId = ((Literal) 
bindingSet.getValue(bindingName)).longValue();
+
+            final Future<RecordMetadata> future = producer
+                .send(new ProducerRecord<String, BindingSet>(topic, 
Long.toString(binId), bindingSet));
             //wait for confirmation that results have been received
             future.get(5, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            throw new BindingSetRecordExportException(e.getMessage());
+        } catch (final Exception e) {  // catch all possible exceptional 
behavior and throw as our checked exception.
+            throw new BindingSetRecordExportException(e.getMessage(), e);
         }
     }
 
@@ -87,11 +86,11 @@ public class KafkaPeriodicBindingSetExporter implements 
BindingSetExporter, Runn
                 exportNotification(bindingSets.take());
             }
         } catch (InterruptedException | BindingSetRecordExportException e) {
-            log.trace("Thread " + threadNumber + " is unable to process 
message.");
+            log.warn("Thread " + threadNumber + " is unable to process 
message.", e);
         }
     }
-    
-    
+
+
     public void shutdown() {
         closed.set(true);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
index a9a5ad1..e589141 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
@@ -20,36 +20,43 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.periodic.notification.api.BindingSetRecord;
 import org.apache.rya.periodic.notification.api.LifeCycle;
 import org.apache.rya.periodic.notification.api.NodeBin;
 import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Executor service that runs {@link TimestampedNotificationProcessor}s with 
basic
  * functionality for starting, stopping, and determining whether notification 
processors are
- * being executed. 
+ * being executed.
  *
  */
 public class NotificationProcessorExecutor implements LifeCycle {
 
-    private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
-    private BlockingQueue<TimestampedNotification> notifications; // 
notifications
-    private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
-    private BlockingQueue<BindingSetRecord> bindingSets; // query results to
-                                                         // export
-    private PeriodicQueryResultStorage periodicStorage;
-    private List<TimestampedNotificationProcessor> processors;
-    private int numberThreads;
+    private static final Logger log = 
LoggerFactory.getLogger(NotificationProcessorExecutor.class);
+    private final BlockingQueue<TimestampedNotification> notifications;
+
+    /**
+     * entries to delete from Fluo
+     */
+    private final BlockingQueue<NodeBin> bins;
+
+    /**
+     * query results to export
+     */
+    private final BlockingQueue<BindingSetRecord> bindingSets;
+    private final PeriodicQueryResultStorage periodicStorage;
+    private final List<TimestampedNotificationProcessor> processors;
+    private final int numberThreads;
     private ExecutorService executor;
     private boolean running = false;
 
@@ -61,11 +68,11 @@ public class NotificationProcessorExecutor implements 
LifeCycle {
      * @param bindingSets - results read from the storage layer to be exported
      * @param numberThreads - number of threads used for processing
      */
-    public NotificationProcessorExecutor(PeriodicQueryResultStorage 
periodicStorage, BlockingQueue<TimestampedNotification> notifications,
-            BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> 
bindingSets, int numberThreads) {
-        this.notifications = Preconditions.checkNotNull(notifications);
-        this.bins = Preconditions.checkNotNull(bins);
-        this.bindingSets = Preconditions.checkNotNull(bindingSets);
+    public NotificationProcessorExecutor(final PeriodicQueryResultStorage 
periodicStorage, final BlockingQueue<TimestampedNotification> notifications,
+            final BlockingQueue<NodeBin> bins, final 
BlockingQueue<BindingSetRecord> bindingSets, final int numberThreads) {
+        this.notifications = Objects.requireNonNull(notifications);
+        this.bins = Objects.requireNonNull(bins);
+        this.bindingSets = Objects.requireNonNull(bindingSets);
         this.periodicStorage = periodicStorage;
         this.numberThreads = numberThreads;
         processors = new ArrayList<>();
@@ -76,8 +83,8 @@ public class NotificationProcessorExecutor implements 
LifeCycle {
         if (!running) {
             executor = Executors.newFixedThreadPool(numberThreads);
             for (int threadNumber = 0; threadNumber < numberThreads; 
threadNumber++) {
-                log.info("Creating exporter:" + threadNumber);
-                TimestampedNotificationProcessor processor = 
TimestampedNotificationProcessor.builder().setBindingSets(bindingSets)
+                log.info("Creating processor for thread: {}", threadNumber);
+                final TimestampedNotificationProcessor processor = 
TimestampedNotificationProcessor.builder().setBindingSets(bindingSets)
                         
.setBins(bins).setPeriodicStorage(periodicStorage).setNotifications(notifications).setThreadNumber(threadNumber)
                         .build();
                 processors.add(processor);
@@ -97,11 +104,11 @@ public class NotificationProcessorExecutor implements 
LifeCycle {
         }
         running = false;
         try {
-            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                 log.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
                 executor.shutdownNow();
             }
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             log.info("Interrupted during shutdown, exiting uncleanly");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
index 8b65683..ae586da 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
@@ -22,7 +22,6 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import org.apache.rya.periodic.notification.api.BinPruner;
@@ -32,6 +31,8 @@ import 
org.apache.rya.periodic.notification.api.NotificationProcessor;
 import 
org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter;
 import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
 import org.openrdf.query.BindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -46,19 +47,30 @@ import com.google.common.base.Preconditions;
  */
 public class TimestampedNotificationProcessor implements 
NotificationProcessor, Runnable {
 
-    private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
-    private PeriodicQueryResultStorage periodicStorage;
-    private BlockingQueue<TimestampedNotification> notifications; // 
notifications
-                                                                  // to process
-    private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
-    private BlockingQueue<BindingSetRecord> bindingSets; // query results to 
export
-    private AtomicBoolean closed = new AtomicBoolean(false);
-    private int threadNumber;
-    
-
-    public TimestampedNotificationProcessor(PeriodicQueryResultStorage 
periodicStorage,
-            BlockingQueue<TimestampedNotification> notifications, 
BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
-            int threadNumber) {
+    private static final Logger log = 
LoggerFactory.getLogger(TimestampedNotificationProcessor.class);
+    private final PeriodicQueryResultStorage periodicStorage;
+
+    /**
+     * notifications to process
+     */
+    private final BlockingQueue<TimestampedNotification> notifications;
+
+    /**
+     * entries to delete from Fluo
+     */
+    private final BlockingQueue<NodeBin> bins;
+
+    /**
+     * query results to export
+     */
+    private final BlockingQueue<BindingSetRecord> bindingSets;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final int threadNumber;
+
+
+    public TimestampedNotificationProcessor(final PeriodicQueryResultStorage 
periodicStorage,
+            final BlockingQueue<TimestampedNotification> notifications, final 
BlockingQueue<NodeBin> bins, final BlockingQueue<BindingSetRecord> bindingSets,
+            final int threadNumber) {
         this.notifications = Preconditions.checkNotNull(notifications);
         this.bins = Preconditions.checkNotNull(bins);
         this.bindingSets = Preconditions.checkNotNull(bindingSets);
@@ -75,13 +87,13 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
      * bins can be deleted from Fluo and Accumulo.
      */
     @Override
-    public void processNotification(TimestampedNotification notification) {
+    public void processNotification(final TimestampedNotification 
notification) {
 
-        String id = notification.getId();
-        long ts = notification.getTimestamp().getTime();
-        long period = notification.getPeriod();
-        long bin = getBinFromTimestamp(ts, period);
-        NodeBin nodeBin = new NodeBin(id, bin);
+        final String id = notification.getId();
+        final long ts = notification.getTimestamp().getTime();
+        final long period = notification.getPeriod();
+        final long bin = getBinFromTimestamp(ts, period);
+        final NodeBin nodeBin = new NodeBin(id, bin);
 
         try (CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(id, Optional.of(bin));) {
 
@@ -91,20 +103,20 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
             // add NodeBin to BinPruner queue so that bin can be deleted from
             // Fluo and Accumulo
             bins.add(nodeBin);
-        } catch (Exception e) {
-            log.debug("Encountered error: " + e.getMessage() + " while 
accessing periodic results for bin: " + bin + " for query: " + id);
+        } catch (final Exception e) {
+            log.warn("Encountered exception while accessing periodic results 
for bin: " + bin + " for query: " + id, e);
         }
     }
 
     /**
      * Computes left bin end point containing event time ts
-     * 
+     *
      * @param ts - event time
      * @param start - time that periodic event began
      * @param period - length of period
      * @return left bin end point containing event time ts
      */
-    private long getBinFromTimestamp(long ts, long period) {
+    private long getBinFromTimestamp(final long ts, final long period) {
         Preconditions.checkArgument(period > 0);
         return (ts / period) * period;
     }
@@ -115,13 +127,13 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
             while(!closed.get()) {
                 processNotification(notifications.take());
             }
-        } catch (Exception e) {
-            log.trace("Thread_" + threadNumber + " is unable to process next 
notification.");
+        } catch (final Exception e) {
+            log.warn("Thread {} is unable to process next notification.", 
threadNumber);
             throw new RuntimeException(e);
         }
 
     }
-    
+
     public void shutdown() {
         closed.set(true);
     }
@@ -130,7 +142,7 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
         return new Builder();
     }
 
-  
+
 
     public static class Builder {
 
@@ -138,7 +150,7 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
         private BlockingQueue<TimestampedNotification> notifications; // 
notifications to process
         private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
         private BlockingQueue<BindingSetRecord> bindingSets; // query results 
to export
-                                                       
+
         private int threadNumber;
 
         /**
@@ -146,7 +158,7 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
          * @param notifications - work queue containing notifications to be 
processed
          * @return this Builder for chaining method calls
          */
-        public Builder setNotifications(BlockingQueue<TimestampedNotification> 
notifications) {
+        public Builder setNotifications(final 
BlockingQueue<TimestampedNotification> notifications) {
             this.notifications = notifications;
             return this;
         }
@@ -156,7 +168,7 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
          * @param bins - work queue containing NodeBins to be pruned
          * @return this Builder for chaining method calls
          */
-        public Builder setBins(BlockingQueue<NodeBin> bins) {
+        public Builder setBins(final BlockingQueue<NodeBin> bins) {
             this.bins = bins;
             return this;
         }
@@ -166,7 +178,7 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
          * @param bindingSets - work queue containing BindingSets to be 
exported
          * @return this Builder for chaining method calls
          */
-        public Builder setBindingSets(BlockingQueue<BindingSetRecord> 
bindingSets) {
+        public Builder setBindingSets(final BlockingQueue<BindingSetRecord> 
bindingSets) {
             this.bindingSets = bindingSets;
             return this;
         }
@@ -176,17 +188,17 @@ public class TimestampedNotificationProcessor implements 
NotificationProcessor,
          * @param threadNumber - number of threads used by this processor
          * @return - number of threads used by this processor
          */
-        public Builder setThreadNumber(int threadNumber) {
+        public Builder setThreadNumber(final int threadNumber) {
             this.threadNumber = threadNumber;
             return this;
         }
-        
+
         /**
          * Set the PeriodicStorage layer
          * @param periodicStorage - periodic storage layer that periodic 
results are read from
          * @return - this Builder for chaining method calls
          */
-        public Builder setPeriodicStorage(PeriodicQueryResultStorage 
periodicStorage) {
+        public Builder setPeriodicStorage(final PeriodicQueryResultStorage 
periodicStorage) {
             this.periodicStorage = periodicStorage;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
index a9403c2..f3d5a8d 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
@@ -18,25 +18,25 @@
  */
 package org.apache.rya.periodic.notification.pruner;
 
-import org.apache.log4j.Logger;
+import java.util.Objects;
+
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.NodeBin;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Deletes BindingSets from time bins in the indicated PCJ table
  */
 public class AccumuloBinPruner implements BinPruner {
 
-    private static final Logger log = 
Logger.getLogger(AccumuloBinPruner.class);
-    private PeriodicQueryResultStorage periodicStorage;
+    private static final Logger log = 
LoggerFactory.getLogger(AccumuloBinPruner.class);
+    private final PeriodicQueryResultStorage periodicStorage;
 
-    public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) {
-        Preconditions.checkNotNull(periodicStorage);
-        this.periodicStorage = periodicStorage;
+    public AccumuloBinPruner(final PeriodicQueryResultStorage periodicStorage) 
{
+        this.periodicStorage = Objects.requireNonNull(periodicStorage);
     }
 
     /**
@@ -44,20 +44,20 @@ public class AccumuloBinPruner implements BinPruner {
      * table indicated by the id. It is assumed that all BindingSet entries for
      * the corresponding bin are written to the PCJ table so that the bin Id
      * occurs first.
-     * 
+     *
      * @param id
      *            - pcj table id
      * @param bin
      *            - temporal bin the BindingSets are contained in
      */
     @Override
-    public void pruneBindingSetBin(NodeBin nodeBin) {
-        Preconditions.checkNotNull(nodeBin);
-        String id = nodeBin.getNodeId();
-        long bin = nodeBin.getBin();
+    public void pruneBindingSetBin(final NodeBin nodeBin) {
+        Objects.requireNonNull(nodeBin);
+        final String id = nodeBin.getNodeId();
+        final long bin = nodeBin.getBin();
         try {
             periodicStorage.deletePeriodicQueryResults(id, bin);
-        } catch (PeriodicQueryStorageException e) {
+        } catch (final PeriodicQueryStorageException e) {
             log.trace("Unable to delete results from Peroidic Table: " + id + 
" for bin: " + bin);
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
index bee9c02..0562180 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
@@ -23,13 +23,15 @@ import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
 import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.NodeBin;
+import org.openrdf.query.BindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 
@@ -38,35 +40,35 @@ import com.google.common.base.Optional;
  */
 public class FluoBinPruner implements BinPruner {
 
-    private static final Logger log = Logger.getLogger(FluoBinPruner.class);
-    private FluoClient client;
+    private static final Logger log = 
LoggerFactory.getLogger(FluoBinPruner.class);
+    private final FluoClient client;
 
-    public FluoBinPruner(FluoClient client) {
+    public FluoBinPruner(final FluoClient client) {
         this.client = client;
     }
 
     /**
      * This method deletes BindingSets in the specified bin from the BindingSet
      * Column of the indicated Fluo nodeId
-     * 
+     *
      * @param id
      *            - Fluo nodeId
      * @param bin
      *            - bin id
      */
     @Override
-    public void pruneBindingSetBin(NodeBin nodeBin) {
-        String id = nodeBin.getNodeId();
-        long bin = nodeBin.getBin();
+    public void pruneBindingSetBin(final NodeBin nodeBin) {
+        final String id = nodeBin.getNodeId();
+        final long bin = nodeBin.getBin();
         try (Transaction tx = client.newTransaction()) {
-            Optional<NodeType> type = NodeType.fromNodeId(id);
+            final Optional<NodeType> type = NodeType.fromNodeId(id);
             if (!type.isPresent()) {
                 log.trace("Unable to determine NodeType from id: " + id);
                 throw new RuntimeException();
             }
-            Column batchInfoColumn = type.get().getResultColumn();
-            String batchInfoSpanPrefix = id + 
IncrementalUpdateConstants.NODEID_BS_DELIM + bin;
-            SpanBatchDeleteInformation batchInfo = 
SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn)
+            final Column batchInfoColumn = type.get().getResultColumn();
+            final String batchInfoSpanPrefix = id + 
IncrementalUpdateConstants.NODEID_BS_DELIM + bin;
+            final SpanBatchDeleteInformation batchInfo = 
SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn)
                     
.setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build();
             BatchInformationDAO.addBatch(tx, id, batchInfo);
             tx.commit();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
index 327154a..c21710a 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
@@ -19,6 +19,7 @@
 package org.apache.rya.periodic.notification.pruner;
 
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,13 +27,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
 import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
 import org.apache.rya.periodic.notification.api.BinPruner;
 import org.apache.rya.periodic.notification.api.NodeBin;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of {@link BinPruner} that deletes old, already processed
@@ -42,63 +42,63 @@ import com.google.common.base.Preconditions;
  */
 public class PeriodicQueryPruner implements BinPruner, Runnable {
 
-    private static final Logger log = 
Logger.getLogger(PeriodicQueryPruner.class);
-    private FluoClient client;
-    private AccumuloBinPruner accPruner;
-    private FluoBinPruner fluoPruner;
-    private BlockingQueue<NodeBin> bins;
-    private AtomicBoolean closed = new AtomicBoolean(false);
-    private int threadNumber;
+    private static final Logger log = 
LoggerFactory.getLogger(PeriodicQueryPruner.class);
+    private final FluoClient client;
+    private final AccumuloBinPruner accPruner;
+    private final FluoBinPruner fluoPruner;
+    private final BlockingQueue<NodeBin> bins;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final int threadNumber;
 
-    public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner 
accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) {
-        this.fluoPruner = Preconditions.checkNotNull(fluoPruner);
-        this.accPruner = Preconditions.checkNotNull(accPruner);
-        this.client = Preconditions.checkNotNull(client);
-        this.bins = Preconditions.checkNotNull(bins);
+    public PeriodicQueryPruner(final FluoBinPruner fluoPruner, final 
AccumuloBinPruner accPruner, final FluoClient client, final 
BlockingQueue<NodeBin> bins, final int threadNumber) {
+        this.fluoPruner = Objects.requireNonNull(fluoPruner);
+        this.accPruner = Objects.requireNonNull(accPruner);
+        this.client = Objects.requireNonNull(client);
+        this.bins = Objects.requireNonNull(bins);
         this.threadNumber = threadNumber;
     }
-    
+
     @Override
     public void run() {
         try {
             while (!closed.get()) {
                 pruneBindingSetBin(bins.take());
             }
-        } catch (InterruptedException e) {
-            log.trace("Thread " + threadNumber + " is unable to prune the next 
message.");
+        } catch (final InterruptedException e) {
+            log.warn("Thread {} is unable to prune the next message.", 
threadNumber);
             throw new RuntimeException(e);
         }
     }
-    
+
     /**
      * Prunes BindingSet bins from the Rya Fluo Application in addition to the 
BindingSet
      * bins created in the PCJ tables associated with the give query id.
-     * @param id - QueryResult Id for the Rya Fluo application 
+     * @param id - QueryResult Id for the Rya Fluo application
      * @param bin - bin id for bins to be deleted
      */
     @Override
-    public void pruneBindingSetBin(NodeBin nodeBin) {
-        String pcjId = nodeBin.getNodeId();
-        long bin = nodeBin.getBin();
+    public void pruneBindingSetBin(final NodeBin nodeBin) {
+        final String pcjId = nodeBin.getNodeId();
+        final long bin = nodeBin.getBin();
         try(Snapshot sx = client.newSnapshot()) {
-            String queryId = NodeType.generateNewIdForType(NodeType.QUERY, 
pcjId);
-            Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId);
+            final String queryId = 
NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
+            final Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId);
             accPruner.pruneBindingSetBin(nodeBin);
-            for(String fluoId: fluoIds) {
+            for(final String fluoId: fluoIds) {
                 fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin));
             }
-        } catch (Exception e) {
-            log.trace("Could not successfully initialize 
PeriodicQueryBinPruner.");
+        } catch (final Exception e) {
+            log.warn("Could not successfully initialize 
PeriodicQueryBinPruner.", e);
         }
     }
-    
-    
+
+
     public void shutdown() {
         closed.set(true);
     }
 
-    private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) {
-        Set<String> ids = new HashSet<>();
+    private Set<String> getNodeIdsFromResultId(final SnapshotBase sx, final 
String id) {
+        final Set<String> ids = new HashSet<>();
         PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids);
         return ids;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
index 1c11f96..d3e87c6 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
@@ -26,10 +26,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.fluo.api.client.FluoClient;
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.periodic.notification.api.LifeCycle;
 import org.apache.rya.periodic.notification.api.NodeBin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -39,17 +40,17 @@ import com.google.common.base.Preconditions;
  */
 public class PeriodicQueryPrunerExecutor implements LifeCycle {
 
-    private static final Logger log = 
Logger.getLogger(PeriodicQueryPrunerExecutor.class);
-    private FluoClient client;
-    private int numThreads;
-    private ExecutorService executor;
-    private BlockingQueue<NodeBin> bins;
-    private PeriodicQueryResultStorage periodicStorage;
-    private List<PeriodicQueryPruner> pruners;
+    private static final Logger log = 
LoggerFactory.getLogger(PeriodicQueryPrunerExecutor.class);
+    private final FluoClient client;
+    private final int numThreads;
+    private final ExecutorService executor;
+    private final BlockingQueue<NodeBin> bins;
+    private final PeriodicQueryResultStorage periodicStorage;
+    private final List<PeriodicQueryPruner> pruners;
     private boolean running = false;
 
-    public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage 
periodicStorage, FluoClient client, int numThreads,
-            BlockingQueue<NodeBin> bins) {
+    public PeriodicQueryPrunerExecutor(final PeriodicQueryResultStorage 
periodicStorage, final FluoClient client, final int numThreads,
+            final BlockingQueue<NodeBin> bins) {
         Preconditions.checkArgument(numThreads > 0);
         this.periodicStorage = periodicStorage;
         this.numThreads = numThreads;
@@ -62,11 +63,11 @@ public class PeriodicQueryPrunerExecutor implements 
LifeCycle {
     @Override
     public void start() {
         if (!running) {
-            AccumuloBinPruner accPruner = new 
AccumuloBinPruner(periodicStorage);
-            FluoBinPruner fluoPruner = new FluoBinPruner(client);
+            final AccumuloBinPruner accPruner = new 
AccumuloBinPruner(periodicStorage);
+            final FluoBinPruner fluoPruner = new FluoBinPruner(client);
 
             for (int threadNumber = 0; threadNumber < numThreads; 
threadNumber++) {
-                PeriodicQueryPruner pruner = new 
PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber);
+                final PeriodicQueryPruner pruner = new 
PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber);
                 pruners.add(pruner);
                 executor.submit(pruner);
             }
@@ -87,11 +88,11 @@ public class PeriodicQueryPrunerExecutor implements 
LifeCycle {
             running = false;
         }
         try {
-            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                 log.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
                 executor.shutdownNow();
             }
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             log.info("Interrupted during shutdown, exiting uncleanly");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
index f5cd13a..f741d20 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
@@ -42,11 +42,11 @@ import org.slf4j.LoggerFactory;
  */
 public class KafkaNotificationProvider implements LifeCycle {
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaNotificationProvider.class);
-    private String topic;
+    private final String topic;
     private ExecutorService executor;
-    private NotificationCoordinatorExecutor coord;
-    private Properties props;
-    private int numThreads;
+    private final NotificationCoordinatorExecutor coord;
+    private final Properties props;
+    private final int numThreads;
     private boolean running = false;
     Deserializer<String> keyDe;
     Deserializer<CommandNotification> valDe;
@@ -54,15 +54,15 @@ public class KafkaNotificationProvider implements LifeCycle 
{
 
     /**
      * Create KafkaNotificationProvider for reading new notification requests 
form Kafka
-     * @param topic - notification topic    
+     * @param topic - notification topic
      * @param keyDe - Kafka message key deserializer
      * @param valDe - Kafka message value deserializer
      * @param props - properties used to creates a {@link KafkaConsumer}
      * @param coord - {@link NotificationCoordinatorExecutor} for managing and 
generating notifications
      * @param numThreads - number of threads used by this notification provider
      */
-    public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, 
Deserializer<CommandNotification> valDe, Properties props,
-            NotificationCoordinatorExecutor coord, int numThreads) {
+    public KafkaNotificationProvider(final String topic, final 
Deserializer<String> keyDe, final Deserializer<CommandNotification> valDe, 
final Properties props,
+            final NotificationCoordinatorExecutor coord, final int numThreads) 
{
         this.coord = coord;
         this.numThreads = numThreads;
         this.topic = topic;
@@ -75,7 +75,7 @@ public class KafkaNotificationProvider implements LifeCycle {
     @Override
     public void stop() {
         if (consumers != null && consumers.size() > 0) {
-            for (PeriodicNotificationConsumer consumer : consumers) {
+            for (final PeriodicNotificationConsumer consumer : consumers) {
                 consumer.shutdown();
             }
         }
@@ -88,11 +88,12 @@ public class KafkaNotificationProvider implements LifeCycle 
{
                 LOG.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
                 executor.shutdownNow();
             }
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             LOG.info("Interrupted during shutdown, exiting uncleanly");
         }
     }
 
+    @Override
     public void start() {
         if (!running) {
             if (!coord.currentlyRunning()) {
@@ -102,14 +103,12 @@ public class KafkaNotificationProvider implements 
LifeCycle {
             executor = Executors.newFixedThreadPool(numThreads);
 
             // now create consumers to consume the messages
-            int threadNumber = 0;
-            for (int i = 0; i < numThreads; i++) {
-                LOG.info("Creating consumer:" + threadNumber);
-                KafkaConsumer<String, CommandNotification> consumer = new 
KafkaConsumer<String, CommandNotification>(props, keyDe, valDe);
-                PeriodicNotificationConsumer periodicConsumer = new 
PeriodicNotificationConsumer(topic, consumer, threadNumber, coord);
+            for (int threadNumber = 0; threadNumber < numThreads; 
threadNumber++) {
+                LOG.info("Creating consumer: {} for Kafka topic: {}", 
threadNumber, topic);
+                final KafkaConsumer<String, CommandNotification> consumer = 
new KafkaConsumer<String, CommandNotification>(props, keyDe, valDe);
+                final PeriodicNotificationConsumer periodicConsumer = new 
PeriodicNotificationConsumer(topic, consumer, threadNumber, coord);
                 consumers.add(periodicConsumer);
                 executor.submit(periodicConsumer);
-                threadNumber++;
             }
             running = true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
index 6785ce8..f8fbed8 100644
--- 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
@@ -25,9 +25,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.log4j.Logger;
 import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
 import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Consumer for the {@link KafkaNotificationProvider}.  This consumer pull 
messages
@@ -35,52 +36,55 @@ import 
org.apache.rya.periodic.notification.notification.CommandNotification;
  *
  */
 public class PeriodicNotificationConsumer implements Runnable {
-    private KafkaConsumer<String, CommandNotification> consumer;
-    private int m_threadNumber;
-    private String topic;
+    private final KafkaConsumer<String, CommandNotification> consumer;
+    private final int threadNumber;
+    private final String topic;
     private final AtomicBoolean closed = new AtomicBoolean(false);
-    private NotificationCoordinatorExecutor coord;
-    private static final Logger LOG = 
Logger.getLogger(PeriodicNotificationConsumer.class);
+    private final NotificationCoordinatorExecutor coord;
+    private static final Logger LOG = 
LoggerFactory.getLogger(PeriodicNotificationConsumer.class);
 
     /**
      * Creates a new PeriodicNotificationConsumer for consuming new 
notification requests from
      * Kafka.
      * @param topic - new notification topic
      * @param consumer - consumer for pulling new requests from Kafka
-     * @param a_threadNumber - number of consumer threads to be used
+     * @param threadNumber - an identifier for this thread.
      * @param coord - notification coordinator for managing and generating 
notifications
      */
-    public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, 
CommandNotification> consumer, int a_threadNumber,
-            NotificationCoordinatorExecutor coord) {
+    public PeriodicNotificationConsumer(final String topic, final 
KafkaConsumer<String, CommandNotification> consumer, final int threadNumber,
+            final NotificationCoordinatorExecutor coord) {
         this.topic = topic;
-        m_threadNumber = a_threadNumber;
+        this.threadNumber = threadNumber;
         this.consumer = consumer;
         this.coord = coord;
     }
 
+    @Override
     public void run() {
-        
+
         try {
-            LOG.info("Creating kafka stream for consumer:" + m_threadNumber);
+            LOG.info("Configuring KafkaConsumer on thread: {} to subscribe to 
topic: {}", threadNumber, topic);
             consumer.subscribe(Arrays.asList(topic));
             while (!closed.get()) {
-                ConsumerRecords<String, CommandNotification> records = 
consumer.poll(10000);
+                final ConsumerRecords<String, CommandNotification> records = 
consumer.poll(10000);
                 // Handle new records
-                for(ConsumerRecord<String, CommandNotification> record: 
records) {
-                    CommandNotification notification = record.value();
-                    LOG.info("Thread " + m_threadNumber + " is adding 
notification " + notification + " to queue.");
-                    LOG.info("Message: " + notification);
+                for(final ConsumerRecord<String, CommandNotification> record: 
records) {
+                    final CommandNotification notification = record.value();
+                    LOG.info("Thread {} is adding notification: {}", 
threadNumber, notification);
                     coord.processNextCommandNotification(notification);
                 }
             }
-        } catch (WakeupException e) {
+            LOG.info("Finished polling.");
+        } catch (final WakeupException e) {
             // Ignore exception if closing
-            if (!closed.get()) throw e;
+            if (!closed.get()) {
+                throw e;
+            }
         } finally {
             consumer.close();
         }
     }
-    
+
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/pom.xml 
b/extras/periodic.notification/tests/pom.xml
index feb1f0f..3f0c413 100644
--- a/extras/periodic.notification/tests/pom.xml
+++ b/extras/periodic.notification/tests/pom.xml
@@ -1,14 +1,22 @@
-<?xml version="1.0" encoding="utf-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
-    license agreements. See the NOTICE file distributed with this work for 
additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    you under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
index 3b6062f..92e3276 100644
--- 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -126,7 +126,7 @@ public class PeriodicNotificationApplicationIT extends 
RyaExportITBase {
         producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new 
CommandNotificationSerializer());
 
         //extract kafka specific properties from application config
-        app = 
PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
+        app = 
PeriodicNotificationApplicationFactory.getPeriodicApplication(conf);
         registrar = new 
KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/tests/src/test/resources/notification.properties
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/resources/notification.properties 
b/extras/periodic.notification/tests/src/test/resources/notification.properties
index 4b25b93..b5f2a90 100644
--- 
a/extras/periodic.notification/tests/src/test/resources/notification.properties
+++ 
b/extras/periodic.notification/tests/src/test/resources/notification.properties
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,21 +14,21 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#/
+
 accumulo.auths=
 accumulo.instance="instance"
 accumulo.user="root"
 accumulo.password="secret"
 accumulo.rya.prefix="rya_"
 accumulo.zookeepers=
-fluo.app.name="fluo_app"
-fluo.table.name="fluo_table"
-kafka.bootstrap.servers=127.0.0.1:9092
-kafka.notification.topic=notifications
-kafka.notification.client.id=consumer0
-kafka.notification.group.id=group0
-cep.coordinator.threads=1
-cep.producer.threads=1
-cep.exporter.threads=1
-cep.processor.threads=1
-cep.pruner.threads=1
\ No newline at end of file
+rya.pcj.fluo.app.name="fluo_app"
+rya.pcj.fluo.table.name="fluo_table"
+rya.periodic.notification.kafka.bootstrap.servers=127.0.0.1:9092
+rya.periodic.notification.kafka.topic=notifications
+rya.periodic.notification.kafka.client.id=consumer0
+rya.periodic.notification.kafka.group.id=group0
+rya.periodic.notification.coordinator.threads=1
+rya.periodic.notification.producer.threads=1
+rya.periodic.notification.exporter.threads=1
+rya.periodic.notification.processor.threads=1
+rya.periodic.notification.pruner.threads=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/README.md
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill.yarn/README.md 
b/extras/periodic.notification/twill.yarn/README.md
new file mode 100644
index 0000000..0475c9c
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/README.md
@@ -0,0 +1,18 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/twill.yarn/pom.xml 
b/extras/periodic.notification/twill.yarn/pom.xml
new file mode 100644
index 0000000..7b9fd02
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.notification.parent</artifactId>
+        <version>3.2.12-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.periodic.notification.twill.yarn</artifactId>
+
+    <name>Apache Rya Periodic Notification Service on Twill on YARN </name>
+    <description>Twill Application for executing the Apache Rya Periodic 
Notification Service on YARN</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.beust</groupId>
+            <artifactId>jcommander</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>13.0.1</version> <!-- Override Rya's 
DependencyManagement. The Twill runtime requires 11.0 < guava < 14.0 
(Service.listener) (AbstractExecutionThreadService.getServiceName) -->
+        </dependency>
+        <dependency>
+            <groupId>org.apache.twill</groupId>
+            <artifactId>twill-yarn</artifactId>
+            <version>0.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.notification.twill</artifactId>
+            <version>3.2.12-incubating-SNAPSHOT</version>
+            <classifier>twill-app</classifier>
+            <scope>provided</scope>  <!--  prevent these dependencies from 
showing up on the classpath -->
+        </dependency>
+        
+        <!-- Add Accumulo as a runtime dependency -->
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-core</artifactId>
+            <scope>runtime</scope>
+            <exclusions>
+                <!--  exclude logging implementations -->
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>create-binary-distribution</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <tarLongFileMode>gnu</tarLongFileMode>
+                            <descriptors>
+                                
<descriptor>src/main/assembly/binary-release.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml 
b/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml
new file mode 100644
index 0000000..a3bba3e
--- /dev/null
+++ 
b/extras/periodic.notification/twill.yarn/src/main/assembly/binary-release.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3
 http://maven.apache.org/xsd/assembly-1.1.3.xsd";>
+    <id>bin</id>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <includeBaseDirectory>true</includeBaseDirectory>
+    <componentDescriptors>
+        
<componentDescriptor>src/main/assembly/component-release.xml</componentDescriptor>
+    </componentDescriptors>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml
 
b/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml
new file mode 100644
index 0000000..8aabf93
--- /dev/null
+++ 
b/extras/periodic.notification/twill.yarn/src/main/assembly/component-release.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<component
+    
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/component/1.1.3
 http://maven.apache.org/xsd/component-1.1.3.xsd";>
+    <fileSets>
+        <fileSet>
+            <directory>src/main/scripts</directory>
+            <outputDirectory>bin</outputDirectory>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>*.sh</include>
+            </includes>
+            <lineEnding>unix</lineEnding>
+            <filtered>true</filtered>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/config</directory>
+            <outputDirectory>conf</outputDirectory>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0644</fileMode>
+            <includes>
+                <include>*.xml</include>
+                <include>*.properties</include>
+                <include>hadoop/*.xml</include>
+            </includes>
+            <lineEnding>unix</lineEnding>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/config</directory>
+            <outputDirectory>conf</outputDirectory>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>*.sh</include>
+            </includes>
+            <lineEnding>unix</lineEnding>
+        </fileSet>
+        
+        <!-- create an empty directory for log files -->
+        <fileSet>
+            <directory>src/main/assembly</directory>
+            <outputDirectory>logs</outputDirectory>
+            <directoryMode>755</directoryMode>
+            <excludes>
+                <exclude>*</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>lib</outputDirectory>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+            <scope>runtime</scope>
+            <!-- exclude hadoop, zookeeper jars -->
+            <excludes>
+                <exclude>org.apache.accumulo:*</exclude>
+                <exclude>org.apache.hadoop:*</exclude>
+                <exclude>org.apache.zookeeper:*</exclude>
+            </excludes>
+        </dependencySet>
+        <dependencySet>
+            <outputDirectory>lib</outputDirectory>
+            <scope>provided</scope>
+            <includes>
+                
<include>org.apache.rya:rya.periodic.notification.twill:*:twill-app</include>
+            </includes>
+        </dependencySet>
+        <dependencySet>
+            <outputDirectory>lib-ahz</outputDirectory>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+            <scope>runtime</scope>
+            <!-- store the hadoop, zookeeper jars in a specific dir -->
+            <includes>
+                <include>org.apache.accumulo:*</include>
+                <include>org.apache.hadoop:*</include>
+                <include>org.apache.zookeeper:*</include>
+            </includes>
+            <excludes>
+                <exclude>log4j:log4j</exclude> <!-- twill uses logback & 
slf4j. -->
+            </excludes>
+        </dependencySet>
+        
+    </dependencySets>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml 
b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml
new file mode 100644
index 0000000..fa31186
--- /dev/null
+++ 
b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/core-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://hdfs-namenode-host:8020/</value>
+    </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml 
b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml
new file mode 100644
index 0000000..a9de6dc
--- /dev/null
+++ 
b/extras/periodic.notification/twill.yarn/src/main/config/hadoop/yarn-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+    <property>
+        <name>yarn.resourcemanager.hostname</name>
+        <value>yarn-resourcemanager-host</value>
+    </property>  
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/logback.xml
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/twill.yarn/src/main/config/logback.xml 
b/extras/periodic.notification/twill.yarn/src/main/config/logback.xml
new file mode 100644
index 0000000..ed36338
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/src/main/config/logback.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<configuration>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <!-- encoders are assigned the type 
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{48} - 
%msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+        <file>logs/PeriodicNotificationApp.log</file>
+        <append>true</append>
+        <!-- encoders are assigned the type 
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{48} - 
%msg%n</pattern>
+            <!-- set immediateFlush to false for much higher logging 
throughput -->
+            <immediateFlush>false</immediateFlush>
+            <charset>UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <logger name="org.apache.rya" level="DEBUG" />
+    <logger name="org.apache.accumulo" level="INFO" />
+    <logger name="org.apache.hadoop" level="INFO" />
+    <logger name="org.apache.fluo" level="INFO" />
+    <logger name="fluo.tx" level="INFO" />
+    <logger name="kafka" level="INFO" />
+    <logger name="org.apache.kafka" level="INFO" />
+    <logger name="org.apache.zookeeper" level="INFO" />
+    <logger name="org.apache.curator" level="INFO" />
+    <logger name="org.apache.twill" level="INFO" />
+
+    <root level="DEBUG">
+        <appender-ref ref="STDOUT" />
+        <appender-ref ref="FILE" />
+    </root>
+
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/notification.properties
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/twill.yarn/src/main/config/notification.properties
 
b/extras/periodic.notification/twill.yarn/src/main/config/notification.properties
new file mode 100644
index 0000000..6d7b47b
--- /dev/null
+++ 
b/extras/periodic.notification/twill.yarn/src/main/config/notification.properties
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# String of Accumulo authorizations.
+#accumulo.auths=
+
+# Accumulo instance name (required)
+accumulo.instance=
+
+# Accumulo user (required)
+accumulo.user=
+
+# Accumulo password (required)
+accumulo.password=
+
+# Prefix for Accumulo backed Rya instance.
+#accumulo.rya.prefix=rya_
+
+# Zookeepers for underlying Accumulo instance (required)
+accumulo.zookeepers=
+
+# Name of Fluo Application (required)
+rya.pcj.fluo.app.name=
+
+# Name of Fluo Table (required)
+rya.pcj.fluo.table.name=
+
+# Kafka Bootstrap servers for Producers and Consumers (required)
+rya.periodic.notification.kafka.bootstrap.servers=
+
+# Topic to which new Periodic Notifications are published.
+#rya.periodic.notification.kafka.topic=notifications
+
+# Client Id for notification topic.
+#rya.periodic.notification.kafka.client.id=consumer0
+
+# Group Id for notification topic.
+#rya.periodic.notification.kafka.group.id=group0
+
+# Number of threads used by coordinator.
+#rya.periodic.notification.coordinator.threads=1
+
+# Number of threads used by producer.
+#rya.periodic.notification.producer.threads=1
+
+# Number of threads used by exporter.
+#rya.periodic.notification.exporter.threads=1
+
+# Number of threads used by processor.
+#rya.periodic.notification.processor.threads=1
+
+# Number of threads used by pruner.
+#rya.periodic.notification.pruner.threads=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh 
b/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh
new file mode 100644
index 0000000..f8716de
--- /dev/null
+++ b/extras/periodic.notification/twill.yarn/src/main/config/twill-env.sh
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+#addToClasspath() 
+#{
+#  local dir=$1
+#  local filterRegex=$2
+
+
+#  if [ ! -d "$dir" ]; then
+#    echo "ERROR $dir does not exist or not a directory"
+#    exit 1
+#  fi
+
+#  for jar in $dir/*.jar; do
+#    if ! [[ $jar =~ $filterRegex ]]; then
+#       CLASSPATH="$CLASSPATH:$jar"
+#    fi
+#  done
+#}
+
+loadLocalAHZLibraries()
+{
+# Specify a hadoop configuration directory to use for connecting to the YARN 
cluster.
+# At a minimum, this directory should contain core-site.xml and yarn-site.xml.
+#HADOOP_CONF_DIR=/etc/hadoop/conf
+HADOOP_CONF_DIR=conf/hadoop
+
+# use local libraries (lib-ahz) for the classpath and append the hadoop conf 
directory
+TWILL_CP=lib/*:lib-ahz/*:$HADOOP_CONF_DIR
+}
+
+loadSystemAHZLibraries()
+{
+#EXCLUDE_RE="(.*log4j.*)|(.*asm.*)|(.*guava.*)|(.*gson.*)"
+#addToClasspath "$ACCUMULO_HOME/lib" $EXCLUDE_RE
+
+# or use the hadoop classpath as specified by your path's hadoop command
+# TODO add excludes for jars that should not be included
+HADOOP_CP=$(hadoop classpath)
+
+# use the 
+TWILL_CP=lib/*:$HADOOP_CP
+}
+
+# Select which method to use for resolving Accumulo, Hadoop, and Zookeeper 
libraries and configuration.
+loadLocalAHZLibraries
+#loadSystemAHZLibraries

Reply via email to