http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
deleted file mode 100644
index 9109775..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
+++ /dev/null
@@ -1,493 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.application;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeFactory;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.core.client.FluoClientImpl;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.rya.api.resolver.RdfToRyaConversions;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
-import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
-import org.apache.rya.kafka.base.EmbeddedKafkaInstance;
-import org.apache.rya.kafka.base.EmbeddedKafkaSingleton;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
-import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-import 
org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
-import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
-import 
org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.Statement;
-import org.openrdf.model.Value;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-import static 
org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
-import static 
org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;;
-
-
-public class PeriodicNotificationApplicationIT extends RyaExportITBase {
-
-    private PeriodicNotificationApplication app;
-    private KafkaNotificationRegistrationClient registrar;
-    private KafkaProducer<String, CommandNotification> producer;
-    private Properties props;
-    private Properties kafkaProps;
-    private PeriodicNotificationApplicationConfiguration conf;
-    private static EmbeddedKafkaInstance embeddedKafka = 
EmbeddedKafkaSingleton.getInstance();
-    private static String bootstrapServers;
-    
-    @Rule
-    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
-    
-    @BeforeClass
-    public static void initClass() {
-        bootstrapServers = 
embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-    }
-    
-    @Before
-    public void init() throws Exception {
-        String topic = rule.getKafkaTopicName();
-        rule.createTopic(topic);
-        
-        //get user specified props and update with the embedded kafka 
bootstrap servers and rule generated topic
-        props = getProps();
-        props.setProperty(NOTIFICATION_TOPIC, topic);
-        props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
-        conf = new PeriodicNotificationApplicationConfiguration(props);
-        
-        //create Kafka Producer
-        kafkaProps = getKafkaProperties(conf);
-        producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new 
CommandNotificationSerializer());
-        
-        //extract kafka specific properties from application config
-        app = 
PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
-        registrar = new 
KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
-    }
-    
-    @Test
-    public void periodicApplicationWithAggAndGroupByTest() throws Exception {
-
-        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
-                + "prefix time: <http://www.w3.org/2006/time#> " // n
-                + "select ?type (count(?obs) as ?total) where {" // n
-                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
-                + "?obs <uri:hasTime> ?time. " // n
-                + "?obs <uri:hasObsType> ?type } group by ?type"; // n
-        
-        //make data
-        int periodMult = 15;
-        final ValueFactory vf = new ValueFactoryImpl();
-        final DatatypeFactory dtf = DatatypeFactory.newInstance();
-        //Sleep until current time aligns nicely with period to makell
-        //results more predictable
-        while(System.currentTimeMillis() % (periodMult*1000) > 500);
-        ZonedDateTime time = ZonedDateTime.now();
-
-        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
-        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
-        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
-        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
-
-        final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
-                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
-                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("automobile")));
-        
-        try (FluoClient fluo = 
FluoClientFactory.getFluoClient(conf.getFluoAppName(), 
Optional.of(conf.getFluoTableName()), conf)) {
-            Connector connector = ConfigUtils.getConnector(conf);
-            PeriodicQueryResultStorage storage = new 
AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
-            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, 
storage);
-            String id = 
FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql,
 registrar).getQueryId());
-            addData(statements);
-            app.start();
-           
-            Multimap<Long, BindingSet> actual = HashMultimap.create();
-            try (KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
-                consumer.subscribe(Arrays.asList(id));
-                long end = System.currentTimeMillis() + 4*periodMult*1000;
-                long lastBinId = 0L;
-                long binId = 0L;
-                List<Long> ids = new ArrayList<>();
-                while (System.currentTimeMillis() < end) {
-                    ConsumerRecords<String, BindingSet> records = 
consumer.poll(periodMult*1000);
-                    for(ConsumerRecord<String, BindingSet> record: records){
-                        BindingSet result = record.value();
-                        binId = 
Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
-                        if(lastBinId != binId) {
-                            lastBinId = binId;
-                            ids.add(binId);
-                        }
-                        actual.put(binId, result);
-                    }
-                }
-                
-                Map<Long, Set<BindingSet>> expected = new HashMap<>();
-                
-                Set<BindingSet> expected1 = new HashSet<>();
-                QueryBindingSet bs1 = new QueryBindingSet();
-                bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(0)));
-                bs1.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
-                bs1.addBinding("type", vf.createLiteral("airplane"));
-                
-                QueryBindingSet bs2 = new QueryBindingSet();
-                bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(0)));
-                bs2.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
-                bs2.addBinding("type", vf.createLiteral("ship"));
-                
-                QueryBindingSet bs3 = new QueryBindingSet();
-                bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(0)));
-                bs3.addBinding("total", new LiteralImpl("1", 
XMLSchema.INTEGER));
-                bs3.addBinding("type", vf.createLiteral("automobile"));
-                
-                expected1.add(bs1);
-                expected1.add(bs2);
-                expected1.add(bs3);
-                
-                Set<BindingSet> expected2 = new HashSet<>();
-                QueryBindingSet bs4 = new QueryBindingSet();
-                bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(1)));
-                bs4.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
-                bs4.addBinding("type", vf.createLiteral("airplane"));
-                
-                QueryBindingSet bs5 = new QueryBindingSet();
-                bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(1)));
-                bs5.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
-                bs5.addBinding("type", vf.createLiteral("ship"));
-                
-                expected2.add(bs4);
-                expected2.add(bs5);
-                
-                Set<BindingSet> expected3 = new HashSet<>();
-                QueryBindingSet bs6 = new QueryBindingSet();
-                bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(2)));
-                bs6.addBinding("total", new LiteralImpl("1", 
XMLSchema.INTEGER));
-                bs6.addBinding("type", vf.createLiteral("ship"));
-                
-                QueryBindingSet bs7 = new QueryBindingSet();
-                bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(2)));
-                bs7.addBinding("total", new LiteralImpl("1", 
XMLSchema.INTEGER));
-                bs7.addBinding("type", vf.createLiteral("airplane"));
-                
-                expected3.add(bs6);
-                expected3.add(bs7);
-                
-                expected.put(ids.get(0), expected1);
-                expected.put(ids.get(1), expected2);
-                expected.put(ids.get(2), expected3);
-                
-                Assert.assertEquals(3, actual.asMap().size());
-                for(Long ident: ids) {
-                    Assert.assertEquals(expected.get(ident), 
actual.get(ident));
-                }
-            }
-            
-            Set<BindingSet> expectedResults = new HashSet<>();
-            try (CloseableIterator<BindingSet> results = 
storage.listResults(id, Optional.empty())) {
-                results.forEachRemaining(x -> expectedResults.add(x));
-                Assert.assertEquals(0, expectedResults.size());
-            }
-        }
-    }
-    
-    
-    @Test
-    public void periodicApplicationWithAggTest() throws Exception {
-
-        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
-                + "prefix time: <http://www.w3.org/2006/time#> " // n
-                + "select (count(?obs) as ?total) where {" // n
-                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
-                + "?obs <uri:hasTime> ?time. " // n
-                + "?obs <uri:hasId> ?id } "; // n
-        
-        //make data
-        int periodMult = 15;
-        final ValueFactory vf = new ValueFactoryImpl();
-        final DatatypeFactory dtf = DatatypeFactory.newInstance();
-        //Sleep until current time aligns nicely with period to make
-        //results more predictable
-        while(System.currentTimeMillis() % (periodMult*1000) > 500);
-        ZonedDateTime time = ZonedDateTime.now();
-
-        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
-        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
-        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
-        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
-
-        final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
-        
-        try (FluoClient fluo = 
FluoClientFactory.getFluoClient(conf.getFluoAppName(), 
Optional.of(conf.getFluoTableName()), conf)) {
-            Connector connector = ConfigUtils.getConnector(conf);
-            PeriodicQueryResultStorage storage = new 
AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
-            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, 
storage);
-            String id = 
FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql,
 registrar).getQueryId());
-            addData(statements);
-            app.start();
-            
-            Multimap<Long, BindingSet> expected = HashMultimap.create();
-            try (KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
-                consumer.subscribe(Arrays.asList(id));
-                long end = System.currentTimeMillis() + 4*periodMult*1000;
-                long lastBinId = 0L;
-                long binId = 0L;
-                List<Long> ids = new ArrayList<>();
-                while (System.currentTimeMillis() < end) {
-                    ConsumerRecords<String, BindingSet> records = 
consumer.poll(periodMult*1000);
-                    for(ConsumerRecord<String, BindingSet> record: records){
-                        BindingSet result = record.value();
-                        binId = 
Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
-                        if(lastBinId != binId) {
-                            lastBinId = binId;
-                            ids.add(binId);
-                        }
-                        expected.put(binId, result);
-                    }
-                }
-                
-                Assert.assertEquals(3, expected.asMap().size());
-                int i = 0;
-                for(Long ident: ids) {
-                    Assert.assertEquals(1, expected.get(ident).size());
-                    BindingSet bs = expected.get(ident).iterator().next();
-                    Value val = bs.getValue("total");
-                    int total = Integer.parseInt(val.stringValue());
-                    Assert.assertEquals(3-i, total);
-                    i++;
-                }
-            }
-            
-            
-            Set<BindingSet> expectedResults = new HashSet<>();
-            try (CloseableIterator<BindingSet> results = 
storage.listResults(id, Optional.empty())) {
-                results.forEachRemaining(x -> expectedResults.add(x));
-                Assert.assertEquals(0, expectedResults.size());
-            }
-        }
-
-    }
-    
-    
-    @Test
-    public void periodicApplicationTest() throws Exception {
-
-        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
-                + "prefix time: <http://www.w3.org/2006/time#> " // n
-                + "select ?obs ?id where {" // n
-                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
-                + "?obs <uri:hasTime> ?time. " // n
-                + "?obs <uri:hasId> ?id } "; // n
-        
-        //make data
-        int periodMult = 15;
-        final ValueFactory vf = new ValueFactoryImpl();
-        final DatatypeFactory dtf = DatatypeFactory.newInstance();
-        //Sleep until current time aligns nicely with period to make
-        //results more predictable
-        while(System.currentTimeMillis() % (periodMult*1000) > 500);
-        ZonedDateTime time = ZonedDateTime.now();
-
-        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
-        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
-        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
-        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
-
-        final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
-        
-        try (FluoClient fluo = 
FluoClientFactory.getFluoClient(conf.getFluoAppName(), 
Optional.of(conf.getFluoTableName()), conf)) {
-            Connector connector = ConfigUtils.getConnector(conf);
-            PeriodicQueryResultStorage storage = new 
AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
-            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, 
storage);
-            String id = 
FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql,
 registrar).getQueryId());
-            addData(statements);
-            app.start();
-           
-            Multimap<Long, BindingSet> expected = HashMultimap.create();
-            try (KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
-                consumer.subscribe(Arrays.asList(id));
-                long end = System.currentTimeMillis() + 4*periodMult*1000;
-                long lastBinId = 0L;
-                long binId = 0L;
-                List<Long> ids = new ArrayList<>();
-                while (System.currentTimeMillis() < end) {
-                    ConsumerRecords<String, BindingSet> records = 
consumer.poll(periodMult*1000);
-                    for(ConsumerRecord<String, BindingSet> record: records){
-                        BindingSet result = record.value();
-                        binId = 
Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
-                        if(lastBinId != binId) {
-                            lastBinId = binId;
-                            ids.add(binId);
-                        }
-                        expected.put(binId, result);
-                    }
-                }
-                
-                Assert.assertEquals(3, expected.asMap().size());
-                int i = 0;
-                for(Long ident: ids) {
-                    Assert.assertEquals(3-i, expected.get(ident).size());
-                    i++;
-                }
-            }
-            
-            
-            Set<BindingSet> expectedResults = new HashSet<>();
-            try (CloseableIterator<BindingSet> results = 
storage.listResults(id, Optional.empty())) {
-                results.forEachRemaining(x -> expectedResults.add(x));
-                Assert.assertEquals(0, expectedResults.size());
-            }
-        }
-
-    }
-    
-    
-    @After
-    public void shutdown() {
-        registrar.close();
-        app.stop();
-    }
-    
-    private void addData(Collection<Statement> statements) throws 
DatatypeConfigurationException {
-        // add statements to Fluo
-        try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
-            InsertTriples inserter = new InsertTriples();
-            statements.forEach(x -> inserter.insert(fluo, 
RdfToRyaConversions.convertStatement(x)));
-            getMiniFluo().waitForObservers();
-        }
-    }
-
-    private static Properties 
getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
-        Properties kafkaProps = new Properties();
-        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-        kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
UUID.randomUUID().toString());
-        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
conf.getNotificationGroupId());
-        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        return kafkaProps;
-    }
-    
-    private Properties getProps() throws IOException {
-        
-        Properties props = new Properties();
-        try(InputStream in = new 
FileInputStream("src/test/resources/notification.properties")) {
-            props.load(in);
-        } 
-        
-        FluoConfiguration fluoConf = getFluoConfiguration();
-        props.setProperty("accumulo.user", getUsername());
-        props.setProperty("accumulo.password", getPassword());
-        props.setProperty("accumulo.instance", 
getMiniAccumuloCluster().getInstanceName());
-        props.setProperty("accumulo.zookeepers", 
getMiniAccumuloCluster().getZooKeepers());
-        props.setProperty("accumulo.rya.prefix", getRyaInstanceName());
-        
props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_APP_NAME, 
fluoConf.getApplicationName());
-        
props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_TABLE_NAME, 
fluoConf.getAccumuloTable());
-        return props;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
deleted file mode 100644
index e05ca6f..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.application;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.core.client.FluoClientImpl;
-import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
-import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
-import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
-import 
org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
-import org.junit.Assert;
-import org.junit.Test;
-import org.openrdf.query.MalformedQueryException;
-
-import com.google.common.collect.Sets;
-
-public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
-
-    @Test
-    public void testProvider() throws MalformedQueryException, 
InterruptedException, UnsupportedQueryException {
-        
-        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
-                + "prefix time: <http://www.w3.org/2006/time#> " // n
-                + "select ?id (count(?obs) as ?total) where {" // n
-                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
-                + "?obs <uri:hasTime> ?time. " // n
-                + "?obs <uri:hasId> ?id } group by ?id"; // n
-        
-        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
-        PeriodicNotificationCoordinatorExecutor coord = new 
PeriodicNotificationCoordinatorExecutor(2, notifications);
-        PeriodicNotificationProvider provider = new 
PeriodicNotificationProvider();
-        CreateFluoPcj pcj = new CreateFluoPcj();
-        
-        String id = null;
-        try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
-            id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, 
Sets.newHashSet(), fluo).getQueryId();
-            provider.processRegisteredNotifications(coord, fluo.newSnapshot());
-        }
-        
-        TimestampedNotification notification = notifications.take();
-        Assert.assertEquals(5000, notification.getInitialDelay());
-        Assert.assertEquals(15000, notification.getPeriod());
-        Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit());
-        Assert.assertEquals(FluoQueryUtils.convertFluoQueryIdToPcjId(id), 
notification.getId());
-        
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
deleted file mode 100644
index 874e7e2..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.exporter;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.kafka.base.KafkaITBase;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-public class PeriodicNotificationExporterIT extends KafkaITBase {
-
-
-    @Rule
-    public KafkaTestInstanceRule kafkaTestInstanceRule = new 
KafkaTestInstanceRule(false);
-
-
-    private static final ValueFactory vf = new ValueFactoryImpl();
-
-    @Test
-    public void testExporter() throws InterruptedException {
-
-        final String topic1 = kafkaTestInstanceRule.getKafkaTopicName() + "1";
-        final String topic2 = kafkaTestInstanceRule.getKafkaTopicName() + "2";
-
-        kafkaTestInstanceRule.createTopic(topic1);
-        kafkaTestInstanceRule.createTopic(topic2);
-
-        final BlockingQueue<BindingSetRecord> records = new 
LinkedBlockingQueue<>();
-
-        final KafkaExporterExecutor exporter = new KafkaExporterExecutor(new 
KafkaProducer<String, BindingSet>(createKafkaProducerConfig()), 1, records);
-        exporter.start();
-        final QueryBindingSet bs1 = new QueryBindingSet();
-        bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, 
vf.createLiteral(1L));
-        bs1.addBinding("name", vf.createURI("uri:Bob"));
-        final BindingSetRecord record1 = new BindingSetRecord(bs1, topic1);
-
-        final QueryBindingSet bs2 = new QueryBindingSet();
-        bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, 
vf.createLiteral(2L));
-        bs2.addBinding("name", vf.createURI("uri:Joe"));
-        final BindingSetRecord record2 = new BindingSetRecord(bs2, topic2);
-
-        records.add(record1);
-        records.add(record2);
-
-        final Set<BindingSet> expected1 = new HashSet<>();
-        expected1.add(bs1);
-        final Set<BindingSet> expected2 = new HashSet<>();
-        expected2.add(bs2);
-
-        final Set<BindingSet> actual1 = getBindingSetsFromKafka(topic1);
-        final Set<BindingSet> actual2 = getBindingSetsFromKafka(topic2);
-
-        Assert.assertEquals(expected1, actual1);
-        Assert.assertEquals(expected2, actual2);
-
-        exporter.stop();
-    }
-
-
-    private Properties createKafkaProducerConfig() {
-        final Properties props = createBootstrapServerConfig();
-        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
BindingSetSerDe.class.getName());
-        return props;
-    }
-    private Properties createKafkaConsumerConfig() {
-        final Properties props = createBootstrapServerConfig();
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
-        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BindingSetSerDe.class.getName());
-        return props;
-    }
-
-
-    private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final 
String topicName) {
-        // setup consumer
-        final KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(createKafkaConsumerConfig());
-        consumer.subscribe(Arrays.asList(topicName));
-        return consumer;
-    }
-
-    private Set<BindingSet> getBindingSetsFromKafka(final String topicName) {
-        KafkaConsumer<String, BindingSet> consumer = null;
-
-        try {
-            consumer = makeBindingSetConsumer(topicName);
-            final ConsumerRecords<String, BindingSet> records = 
consumer.poll(20000);  // Wait up to 20 seconds for a result to be published.
-
-            final Set<BindingSet> bindingSets = new HashSet<>();
-            records.forEach(x -> bindingSets.add(x.value()));
-
-            return bindingSets;
-
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            if (consumer != null) {
-                consumer.close();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
deleted file mode 100644
index 21109ae..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.processor;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.fluo.recipes.test.AccumuloExportITBase;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.api.NodeBin;
-import org.apache.rya.periodic.notification.notification.PeriodicNotification;
-import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
-import org.junit.Assert;
-import org.junit.Test;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-
-public class PeriodicNotificationProcessorIT extends AccumuloExportITBase {
-
-    private static final ValueFactory vf = new ValueFactoryImpl();
-    private static final String RYA_INSTANCE_NAME = "rya_";
-    
-    @Test
-    public void periodicProcessorTest() throws Exception {
-        
-        String id = UUID.randomUUID().toString().replace("-", "");
-        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
-        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
-        BlockingQueue<BindingSetRecord> bindingSets = new 
LinkedBlockingQueue<>();
-        
-        TimestampedNotification ts1 = new TimestampedNotification(
-                
PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());
  
-        long binId1 = 
(ts1.getTimestamp().getTime()/ts1.getPeriod())*ts1.getPeriod();
-        
-        Thread.sleep(2000);
-        
-        TimestampedNotification ts2 = new TimestampedNotification(
-                
PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());
  
-        long binId2 = 
(ts2.getTimestamp().getTime()/ts2.getPeriod())*ts2.getPeriod();
-        
-        Set<NodeBin> expectedBins = new HashSet<>();
-        expectedBins.add(new NodeBin(id, binId1));
-        expectedBins.add(new NodeBin(id, binId2));
-        
-        Set<BindingSet> expected = new HashSet<>();
-        Set<VisibilityBindingSet> storageResults = new HashSet<>();
-        
-        QueryBindingSet bs1 = new QueryBindingSet();
-        bs1.addBinding("periodicBinId", vf.createLiteral(binId1));
-        bs1.addBinding("id", vf.createLiteral(1));
-        expected.add(bs1);
-        storageResults.add(new VisibilityBindingSet(bs1));
-        
-        QueryBindingSet bs2 = new QueryBindingSet();
-        bs2.addBinding("periodicBinId", vf.createLiteral(binId1));
-        bs2.addBinding("id", vf.createLiteral(2));
-        expected.add(bs2);
-        storageResults.add(new VisibilityBindingSet(bs2));
-        
-        QueryBindingSet bs3 = new QueryBindingSet();
-        bs3.addBinding("periodicBinId", vf.createLiteral(binId2));
-        bs3.addBinding("id", vf.createLiteral(3));
-        expected.add(bs3);
-        storageResults.add(new VisibilityBindingSet(bs3));
-        
-        QueryBindingSet bs4 = new QueryBindingSet();
-        bs4.addBinding("periodicBinId", vf.createLiteral(binId2));
-        bs4.addBinding("id", vf.createLiteral(4));
-        expected.add(bs4);
-        storageResults.add(new VisibilityBindingSet(bs4));
-        
-        PeriodicQueryResultStorage periodicStorage = new 
AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
-                RYA_INSTANCE_NAME);
-        periodicStorage.createPeriodicQuery(id, "select ?id where {?obs 
<urn:hasId> ?id.}", new VariableOrder("periodicBinId", "id"));
-        periodicStorage.addPeriodicQueryResults(id, storageResults);
-
-        NotificationProcessorExecutor processor = new 
NotificationProcessorExecutor(periodicStorage, notifications, bins, 
bindingSets, 1);
-        processor.start();
-        
-        notifications.add(ts1);
-        notifications.add(ts2);
-
-        Thread.sleep(5000);
-        
-        Assert.assertEquals(expectedBins.size(), bins.size());
-        Assert.assertEquals(true, bins.containsAll(expectedBins));
-        
-        Set<BindingSet> actual = new HashSet<>();
-        bindingSets.forEach(x -> actual.add(x.getBindingSet()));
-        Assert.assertEquals(expected, actual);
-        
-        processor.stop();
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
deleted file mode 100644
index 830fa46..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.pruner;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.xml.datatype.DatatypeFactory;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.ColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.fluo.core.client.FluoClientImpl;
-import org.apache.rya.api.resolver.RdfToRyaConversions;
-import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
-import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
-import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
-import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
-import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
-import org.apache.rya.periodic.notification.api.NodeBin;
-import org.junit.Assert;
-import org.junit.Test;
-import org.openrdf.model.Statement;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.Sets;
-
-public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
-
-    
-    @Test
-    public void periodicPrunerTest() throws Exception {
-
-        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
-                + "prefix time: <http://www.w3.org/2006/time#> " // n
-                + "select ?id (count(?obs) as ?total) where {" // n
-                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
-                + "?obs <uri:hasTime> ?time. " // n
-                + "?obs <uri:hasId> ?id } group by ?id"; // n
-
-        FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration());
-
-        // initialize resources and create pcj
-        PeriodicQueryResultStorage periodicStorage = new 
AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
-                getRyaInstanceName());
-        CreatePeriodicQuery createPeriodicQuery = new 
CreatePeriodicQuery(fluo, periodicStorage);
-        String queryId = 
FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId());
-
-        // create statements to ingest into Fluo
-        final ValueFactory vf = new ValueFactoryImpl();
-        final DatatypeFactory dtf = DatatypeFactory.newInstance();
-        ZonedDateTime time = ZonedDateTime.now();
-        long currentTime = time.toInstant().toEpochMilli();
-
-        ZonedDateTime zTime1 = time.minusMinutes(30);
-        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
-        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
-        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
-
-        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
-        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
-
-        final Collection<Statement> statements = Sets.newHashSet(
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
-                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
-                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4")),
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
-                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
-                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
-                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")));
-
-        // add statements to Fluo
-        InsertTriples inserter = new InsertTriples();
-        statements.forEach(x -> inserter.insert(fluo, 
RdfToRyaConversions.convertStatement(x)));
-
-        super.getMiniFluo().waitForObservers();
-
-        // FluoITHelper.printFluoTable(fluo);
-
-        // Create the expected results of the SPARQL query once the PCJ has 
been
-        // computed.
-        final Set<BindingSet> expected1 = new HashSet<>();
-        final Set<BindingSet> expected2 = new HashSet<>();
-        final Set<BindingSet> expected3 = new HashSet<>();
-        final Set<BindingSet> expected4 = new HashSet<>();
-
-        long period = 1800000;
-        long binId = (currentTime / period) * period;
-
-        long bin1 = binId;
-        long bin2 = binId + period;
-        long bin3 = binId + 2 * period;
-        long bin4 = binId + 3 * period;
-
-        MapBindingSet bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
-        expected1.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
-        expected1.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
-        expected1.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
-        expected1.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
-        expected2.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
-        expected2.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
-        expected2.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin3));
-        expected3.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin3));
-        expected3.add(bs);
-
-        bs = new MapBindingSet();
-        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
-        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
-        bs.addBinding("periodicBinId", vf.createLiteral(bin4));
-        expected4.add(bs);
-
-        // make sure that expected and actual results align after ingest
-        compareResults(periodicStorage, queryId, bin1, expected1);
-        compareResults(periodicStorage, queryId, bin2, expected2);
-        compareResults(periodicStorage, queryId, bin3, expected3);
-        compareResults(periodicStorage, queryId, bin4, expected4);
-
-        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
-        PeriodicQueryPrunerExecutor pruner = new 
PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins);
-        pruner.start();
-
-        bins.add(new NodeBin(queryId, bin1));
-        bins.add(new NodeBin(queryId, bin2));
-        bins.add(new NodeBin(queryId, bin3));
-        bins.add(new NodeBin(queryId, bin4));
-
-        Thread.sleep(10000);
-
-        compareResults(periodicStorage, queryId, bin1, new HashSet<>());
-        compareResults(periodicStorage, queryId, bin2, new HashSet<>());
-        compareResults(periodicStorage, queryId, bin3, new HashSet<>());
-        compareResults(periodicStorage, queryId, bin4, new HashSet<>());
-
-        compareFluoCounts(fluo, queryId, bin1);
-        compareFluoCounts(fluo, queryId, bin2);
-        compareFluoCounts(fluo, queryId, bin3);
-        compareFluoCounts(fluo, queryId, bin4);
-
-        pruner.stop();
-
-    }
-    
-    private void compareResults(PeriodicQueryResultStorage periodicStorage, 
String queryId, long bin, Set<BindingSet> expected) throws 
PeriodicQueryStorageException, Exception {
-        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(bin))) {
-            Set<BindingSet> actual = new HashSet<>();
-            while(iter.hasNext()) {
-                actual.add(iter.next());
-            }
-            Assert.assertEquals(expected, actual);
-        }
-    }
-    
-    private void compareFluoCounts(FluoClient client, String pcjId, long bin) {
-        QueryBindingSet bs = new QueryBindingSet();
-        bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new 
LiteralImpl(Long.toString(bin), XMLSchema.LONG));
-        
-        VariableOrder varOrder = new 
VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
-        
-        try(Snapshot sx = client.newSnapshot()) {
-            String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, 
pcjId);
-            Set<String> ids = new HashSet<>();
-            PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, 
ids);
-            for(String id: ids) {
-                NodeType optNode = NodeType.fromNodeId(id).orNull();
-                if(optNode == null) throw new RuntimeException("Invalid 
NodeType.");
-                Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs);
-                RowScanner scanner = 
sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build();
-                int count = 0;
-                Iterator<ColumnScanner> colScannerIter = scanner.iterator();
-                while(colScannerIter.hasNext()) {
-                    ColumnScanner colScanner = colScannerIter.next();
-                    String row = colScanner.getRow().toString();
-                    Iterator<ColumnValue> values = colScanner.iterator();
-                    while(values.hasNext()) {
-                        values.next();
-                        count++;
-                    }
-                }
-                Assert.assertEquals(0, count);
-            }
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
deleted file mode 100644
index 522e69d..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */package org.apache.rya.periodic.notification.registration.kafka;
-
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.rya.kafka.base.KafkaITBase;
-import org.apache.rya.kafka.base.KafkaTestInstanceRule;
-import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.notification.CommandNotification;
-import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
-import 
org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
-import 
org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
-
-    private KafkaNotificationRegistrationClient registration;
-    private PeriodicNotificationCoordinatorExecutor coord;
-    private KafkaNotificationProvider provider;
-    private String bootstrapServer;
-    
-    @Rule
-    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
-    
-    @Before
-    public void init() throws Exception {
-        bootstrapServer = 
createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-    }
-
-    @Test
-    public void kafkaNotificationProviderTest() throws InterruptedException {
-
-        BasicConfigurator.configure();
-
-        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
-        Properties props = createKafkaConfig();
-        KafkaProducer<String, CommandNotification> producer = new 
KafkaProducer<>(props);
-        String topic = rule.getKafkaTopicName();
-        rule.createTopic(topic);
-        
-        registration = new KafkaNotificationRegistrationClient(topic, 
producer);
-        coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
-        provider = new KafkaNotificationProvider(topic, new 
StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
-        provider.start();
-
-        registration.addNotification("1", 1, 0, TimeUnit.SECONDS);
-        Thread.sleep(4000);
-        // check that notifications are being added to the blocking queue
-        Assert.assertEquals(true, notifications.size() > 0);
-
-        registration.deleteNotification("1");
-        Thread.sleep(2000);
-        int size = notifications.size();
-        // sleep for 2 seconds to ensure no more messages being produced
-        Thread.sleep(2000);
-        Assert.assertEquals(size, notifications.size());
-        
-        tearDown();
-    }
-
-    @Test
-    public void kafkaNotificationMillisProviderTest() throws 
InterruptedException {
-
-        BasicConfigurator.configure();
-
-        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
-        Properties props = createKafkaConfig();
-        KafkaProducer<String, CommandNotification> producer = new 
KafkaProducer<>(props);
-        String topic = rule.getKafkaTopicName();
-        rule.createTopic(topic);
-        
-        registration = new KafkaNotificationRegistrationClient(topic, 
producer);
-        coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
-        provider = new KafkaNotificationProvider(topic, new 
StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
-        provider.start();
-
-        registration.addNotification("1", 1000, 0, TimeUnit.MILLISECONDS);
-        Thread.sleep(4000);
-        // check that notifications are being added to the blocking queue
-        Assert.assertEquals(true, notifications.size() > 0);
-
-        registration.deleteNotification("1");
-        Thread.sleep(2000);
-        int size = notifications.size();
-        // sleep for 2 seconds to ensure no more messages being produced
-        Thread.sleep(2000);
-        Assert.assertEquals(size, notifications.size());
-        
-        tearDown();
-    }
-
-    private void tearDown() {
-        registration.close();
-        provider.stop();
-        coord.stop();
-    }
-
-    private Properties createKafkaConfig() {
-        Properties props = new Properties();
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
CommandNotificationSerializer.class.getName());
-
-        return props;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
deleted file mode 100644
index 19cc13c..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Valid levels:
-# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
-log4j.rootLogger=INFO, CONSOLE
-
-# Set independent logging levels
-log4j.logger.org.apache.zookeeper=WARN
-log4j.logger.kafka=WARN
-log4j.logger.org.apache.kafka=WARN
-
-# LOGFILE is set to be a File appender using a PatternLayout.
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-#log4j.appender.CONSOLE.Threshold=DEBUG
-
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
-
-#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
-#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
 
b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
deleted file mode 100644
index 4b25b93..0000000
--- 
a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#/
-accumulo.auths=
-accumulo.instance="instance"
-accumulo.user="root"
-accumulo.password="secret"
-accumulo.rya.prefix="rya_"
-accumulo.zookeepers=
-fluo.app.name="fluo_app"
-fluo.table.name="fluo_table"
-kafka.bootstrap.servers=127.0.0.1:9092
-kafka.notification.topic=notifications
-kafka.notification.client.id=consumer0
-kafka.notification.group.id=group0
-cep.coordinator.threads=1
-cep.producer.threads=1
-cep.exporter.threads=1
-cep.processor.threads=1
-cep.pruner.threads=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml 
b/extras/rya.periodic.service/periodic.service.notification/pom.xml
deleted file mode 100644
index 1e59e15..0000000
--- a/extras/rya.periodic.service/periodic.service.notification/pom.xml
+++ /dev/null
@@ -1,112 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
-        contributor license agreements. See the NOTICE file distributed with 
this 
-        work for additional information regarding copyright ownership. The ASF 
licenses 
-        this file to you under the Apache License, Version 2.0 (the 
"License"); you 
-        may not use this file except in compliance with the License. You may 
obtain 
-        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
Unless 
-        required by applicable law or agreed to in writing, software 
distributed 
-        under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
-        OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for 
-        the specific language governing permissions and limitations under the 
License. -->
-    <parent>
-        <groupId>org.apache.rya</groupId>
-        <artifactId>rya.periodic.service</artifactId>
-        <version>3.2.11-incubating-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>rya.periodic.service.notification</artifactId>
-
-    <name>Apache Rya Periodic Service Notification</name>
-    <description>Notifications for Rya Periodic Service</description>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.twill</groupId>
-            <artifactId>twill-api</artifactId>
-            <version>0.11.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.twill</groupId>
-            <artifactId>twill-yarn</artifactId>
-            <version>0.11.0</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>kafka_2.10</artifactId>
-                    <groupId>org.apache.kafka</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-            <version>2.8.0</version>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.fluo</groupId>
-            <artifactId>fluo-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.indexing</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-query</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.indexing.pcj</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.pcj.fluo.app</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rya</groupId>
-            <artifactId>rya.periodic.service.api</artifactId>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <encoding>UTF-8</encoding>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>3.0.0</version>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
deleted file mode 100644
index b2c3709..0000000
--- 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.application;
-
-/**
- * Exception thrown when attempting to create a {@link 
PeriodicNotificationApplication}.
- * Indicates that a factory was unable to create some component of the 
application 
- * because something was configured incorrectly.
- *
- */
-public class PeriodicApplicationException extends Exception {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * Creates a PeriodicApplicationException.
-     * @param message - message contained in Exception
-     */
-    public PeriodicApplicationException(String message) {
-        super(message);
-    }
-    
-    /**
-     * Creates a PeriodicApplicationException.
-     * @param message - message contained in Exception
-     * @param t - Exception that spawned this PeriodicApplicationException
-     */
-    public PeriodicApplicationException(String message, Throwable t) {
-        super(message, t);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
deleted file mode 100644
index 92a7d18..0000000
--- 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.periodic.notification.application;
-
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
-import org.apache.rya.periodic.notification.api.BinPruner;
-import org.apache.rya.periodic.notification.api.BindingSetRecord;
-import org.apache.rya.periodic.notification.api.LifeCycle;
-import org.apache.rya.periodic.notification.api.NodeBin;
-import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
-import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
-import 
org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
-import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
-import 
org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
-import org.openrdf.query.algebra.evaluation.function.Function;
-
-import com.google.common.base.Preconditions;
-
-/**
- * The PeriodicNotificationApplication runs the key components of the Periodic
- * Query Service. It consists of a {@link KafkaNotificationProvider}, a
- * {@link NotificationCoordinatorExecutor}, a
- * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, and 
a
- * {@link PeriodicQueryPrunerExecutor}. These services run in coordination with
- * one another to perform the following tasks in the indicated order: <br>
- * <li>Retrieve new requests to generate periodic notifications from Kafka
- * <li>Register them with the {@link NotificationCoordinatorExecutor} to
- * generate the periodic notifications
- * <li>As notifications are generated, they are added to a work queue that is
- * monitored by the {@link NotificationProcessorExecutor}.
- * <li>The processor processes the notifications by reading all of the query
- * results corresponding to the bin and query id indicated by the notification.
- * <li>After reading the results, the processor adds a {@link BindingSetRecord}
- * to a work queue monitored by the {@link KafkaExporterExecutor}.
- * <li>The processor then adds a {@link NodeBin} to a workqueue monitored by 
the
- * {@link BinPruner}
- * <li>The exporter processes the BindingSetRecord by exporing the result to
- * Kafka
- * <li>The BinPruner processes the NodeBin by cleaning up the results for the
- * indicated bin and query in Accumulo and Fluo. <br>
- * <br>
- * The purpose of this Periodic Query Service is to facilitate the ability to
- * answer Periodic Queries using the Rya Fluo application, where a Periodic
- * Query is any query requesting periodic updates about events that occurred
- * within a given window of time of this instant. This is also known as a
- * rolling window query. Period Queries can be expressed using SPARQL by
- * including the {@link Function} indicated by the URI
- * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this
- * Function with the following arguments: the temporal variable in the query
- * that will be filtered on, the window of time that events must occur within,
- * the period at which the user wants to receive updates, and the time unit. 
The
- * following query requests all observations that occurred within the last
- * minute and requests updates every 15 seconds. It also performs a count on
- * those observations. <br>
- * <br>
- * <li>prefix function: http://org.apache.rya/function#
- * <li>"prefix time: http://www.w3.org/2006/time#
- * <li>"select (count(?obs) as ?total) where {
- * <li>"Filter(function:periodic(?time, 1, .25, time:minutes))
- * <li>"?obs uri:hasTime ?time.
- * <li>"?obs uri:hasId ?id }
- * <li>
- */
-public class PeriodicNotificationApplication implements LifeCycle {
-
-    private static final Logger log = 
Logger.getLogger(PeriodicNotificationApplication.class);
-    private NotificationCoordinatorExecutor coordinator;
-    private KafkaNotificationProvider provider;
-    private PeriodicQueryPrunerExecutor pruner;
-    private NotificationProcessorExecutor processor;
-    private KafkaExporterExecutor exporter;
-    private boolean running = false;
-
-    /**
-     * Creates a PeriodicNotificationApplication
-     * @param provider - {@link KafkaNotificationProvider} that retrieves new 
Notificaiton requests from Kafka
-     * @param coordinator - {NotificationCoordinator} that manages 
PeriodicNotifications.
-     * @param processor - {@link NotificationProcessorExecutor} that processes 
PeriodicNotifications
-     * @param exporter - {@link KafkaExporterExecutor} that exports periodic 
results
-     * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old 
periodic bins
-     */
-    public PeriodicNotificationApplication(KafkaNotificationProvider provider, 
NotificationCoordinatorExecutor coordinator,
-            NotificationProcessorExecutor processor, KafkaExporterExecutor 
exporter, PeriodicQueryPrunerExecutor pruner) {
-        this.provider = Preconditions.checkNotNull(provider);
-        this.coordinator = Preconditions.checkNotNull(coordinator);
-        this.processor = Preconditions.checkNotNull(processor);
-        this.exporter = Preconditions.checkNotNull(exporter);
-        this.pruner = Preconditions.checkNotNull(pruner);
-    }
-
-    @Override
-    public void start() {
-        if (!running) {
-            log.info("Starting PeriodicNotificationApplication.");
-            coordinator.start();
-            provider.start();
-            processor.start();
-            pruner.start();
-            exporter.start();
-            running = true;
-        }
-    }
-
-    @Override
-    public void stop() {
-        log.info("Stopping PeriodicNotificationApplication.");
-        provider.stop();
-        coordinator.stop();
-        processor.stop();
-        pruner.stop();
-        exporter.stop();
-        running = false;
-    }
-
-    /**
-     * @return boolean indicating whether the application is running
-     */
-    @Override
-    public boolean currentlyRunning() {
-        return running;
-    }
-
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-
-        private PeriodicQueryPrunerExecutor pruner;
-        private KafkaNotificationProvider provider;
-        private NotificationProcessorExecutor processor;
-        private KafkaExporterExecutor exporter;
-        private NotificationCoordinatorExecutor coordinator;
-
-        /**
-         * Sets the PeriodicQueryPrunerExecutor.
-         * @param pruner - PeriodicQueryPrunerExecutor for cleaning up old 
periodic bins
-         * @return this Builder for chaining method calls
-         */
-        public Builder setPruner(PeriodicQueryPrunerExecutor pruner) {
-            this.pruner = pruner;
-            return this;
-        }
-
-        /**
-         * Sets the KafkaNotificationProvider
-         * @param provider - KafkaNotificationProvider for retrieving new 
periodic notification requests from Kafka
-         * @return this Builder for chaining method calls
-         */
-        public Builder setProvider(KafkaNotificationProvider provider) {
-            this.provider = provider;
-            return this;
-        }
-
-        public Builder setProcessor(NotificationProcessorExecutor processor) {
-            this.processor = processor;
-            return this;
-        }
-
-        /**
-         * Sets KafkaExporterExecutor
-         * @param exporter for exporting periodic query results to Kafka
-         * @return this Builder for chaining method calls
-         */
-        public Builder setExporter(KafkaExporterExecutor exporter) {
-            this.exporter = exporter;
-            return this;
-        }
-
-        /**
-         * Sets NotificationCoordinatorExecutor
-         * @param coordinator for managing and generating periodic 
notifications
-         * @return this Builder for chaining method calls
-         */
-        public Builder setCoordinator(NotificationCoordinatorExecutor 
coordinator) {
-            this.coordinator = coordinator;
-            return this;
-        }
-
-        /**
-         * Creates a PeriodicNotificationApplication
-         * @return PeriodicNotificationApplication for periodically polling 
Rya Fluo Application
-         */
-        public PeriodicNotificationApplication build() {
-            return new PeriodicNotificationApplication(provider, coordinator, 
processor, exporter, pruner);
-        }
-
-    }
-
-}


Reply via email to