http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java deleted file mode 100644 index 9109775..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java +++ /dev/null @@ -1,493 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.application; - -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; - -import javax.xml.datatype.DatatypeConfigurationException; -import javax.xml.datatype.DatatypeFactory; - -import org.apache.accumulo.core.client.Connector; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.core.client.FluoClientImpl; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.rya.api.resolver.RdfToRyaConversions; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; -import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; -import org.apache.rya.kafka.base.EmbeddedKafkaInstance; -import org.apache.rya.kafka.base.EmbeddedKafkaSingleton; -import org.apache.rya.kafka.base.KafkaTestInstanceRule; -import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; -import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; -import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.Statement; -import org.openrdf.model.Value; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - -import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC; -import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;; - - -public class PeriodicNotificationApplicationIT extends RyaExportITBase { - - private PeriodicNotificationApplication app; - private KafkaNotificationRegistrationClient registrar; - private KafkaProducer<String, CommandNotification> producer; - private Properties props; - private Properties kafkaProps; - private PeriodicNotificationApplicationConfiguration conf; - private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance(); - private static String bootstrapServers; - - @Rule - public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); - - @BeforeClass - public static void initClass() { - bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); - } - - @Before - public void init() throws Exception { - String topic = rule.getKafkaTopicName(); - rule.createTopic(topic); - - //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic - props = getProps(); - props.setProperty(NOTIFICATION_TOPIC, topic); - props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); - conf = new PeriodicNotificationApplicationConfiguration(props); - - //create Kafka Producer - kafkaProps = getKafkaProperties(conf); - producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer()); - - //extract kafka specific properties from application config - app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props); - registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer); - } - - @Test - public void periodicApplicationWithAggAndGroupByTest() throws Exception { - - String sparql = "prefix function: <http://org.apache.rya/function#> " // n - + "prefix time: <http://www.w3.org/2006/time#> " // n - + "select ?type (count(?obs) as ?total) where {" // n - + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n - + "?obs <uri:hasTime> ?time. " // n - + "?obs <uri:hasObsType> ?type } group by ?type"; // n - - //make data - int periodMult = 15; - final ValueFactory vf = new ValueFactoryImpl(); - final DatatypeFactory dtf = DatatypeFactory.newInstance(); - //Sleep until current time aligns nicely with period to makell - //results more predictable - while(System.currentTimeMillis() % (periodMult*1000) > 500); - ZonedDateTime time = ZonedDateTime.now(); - - ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); - String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); - String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); - String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); - - final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")), - vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), - vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile"))); - - try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { - Connector connector = ConfigUtils.getConnector(conf); - PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); - CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); - addData(statements); - app.start(); - - Multimap<Long, BindingSet> actual = HashMultimap.create(); - try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { - consumer.subscribe(Arrays.asList(id)); - long end = System.currentTimeMillis() + 4*periodMult*1000; - long lastBinId = 0L; - long binId = 0L; - List<Long> ids = new ArrayList<>(); - while (System.currentTimeMillis() < end) { - ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); - for(ConsumerRecord<String, BindingSet> record: records){ - BindingSet result = record.value(); - binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); - if(lastBinId != binId) { - lastBinId = binId; - ids.add(binId); - } - actual.put(binId, result); - } - } - - Map<Long, Set<BindingSet>> expected = new HashMap<>(); - - Set<BindingSet> expected1 = new HashSet<>(); - QueryBindingSet bs1 = new QueryBindingSet(); - bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); - bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); - bs1.addBinding("type", vf.createLiteral("airplane")); - - QueryBindingSet bs2 = new QueryBindingSet(); - bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); - bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); - bs2.addBinding("type", vf.createLiteral("ship")); - - QueryBindingSet bs3 = new QueryBindingSet(); - bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); - bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); - bs3.addBinding("type", vf.createLiteral("automobile")); - - expected1.add(bs1); - expected1.add(bs2); - expected1.add(bs3); - - Set<BindingSet> expected2 = new HashSet<>(); - QueryBindingSet bs4 = new QueryBindingSet(); - bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); - bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); - bs4.addBinding("type", vf.createLiteral("airplane")); - - QueryBindingSet bs5 = new QueryBindingSet(); - bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); - bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); - bs5.addBinding("type", vf.createLiteral("ship")); - - expected2.add(bs4); - expected2.add(bs5); - - Set<BindingSet> expected3 = new HashSet<>(); - QueryBindingSet bs6 = new QueryBindingSet(); - bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); - bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); - bs6.addBinding("type", vf.createLiteral("ship")); - - QueryBindingSet bs7 = new QueryBindingSet(); - bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); - bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); - bs7.addBinding("type", vf.createLiteral("airplane")); - - expected3.add(bs6); - expected3.add(bs7); - - expected.put(ids.get(0), expected1); - expected.put(ids.get(1), expected2); - expected.put(ids.get(2), expected3); - - Assert.assertEquals(3, actual.asMap().size()); - for(Long ident: ids) { - Assert.assertEquals(expected.get(ident), actual.get(ident)); - } - } - - Set<BindingSet> expectedResults = new HashSet<>(); - try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { - results.forEachRemaining(x -> expectedResults.add(x)); - Assert.assertEquals(0, expectedResults.size()); - } - } - } - - - @Test - public void periodicApplicationWithAggTest() throws Exception { - - String sparql = "prefix function: <http://org.apache.rya/function#> " // n - + "prefix time: <http://www.w3.org/2006/time#> " // n - + "select (count(?obs) as ?total) where {" // n - + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n - + "?obs <uri:hasTime> ?time. " // n - + "?obs <uri:hasId> ?id } "; // n - - //make data - int periodMult = 15; - final ValueFactory vf = new ValueFactoryImpl(); - final DatatypeFactory dtf = DatatypeFactory.newInstance(); - //Sleep until current time aligns nicely with period to make - //results more predictable - while(System.currentTimeMillis() % (periodMult*1000) > 500); - ZonedDateTime time = ZonedDateTime.now(); - - ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); - String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); - String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); - String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); - - final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); - - try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { - Connector connector = ConfigUtils.getConnector(conf); - PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); - CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); - addData(statements); - app.start(); - - Multimap<Long, BindingSet> expected = HashMultimap.create(); - try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { - consumer.subscribe(Arrays.asList(id)); - long end = System.currentTimeMillis() + 4*periodMult*1000; - long lastBinId = 0L; - long binId = 0L; - List<Long> ids = new ArrayList<>(); - while (System.currentTimeMillis() < end) { - ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); - for(ConsumerRecord<String, BindingSet> record: records){ - BindingSet result = record.value(); - binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); - if(lastBinId != binId) { - lastBinId = binId; - ids.add(binId); - } - expected.put(binId, result); - } - } - - Assert.assertEquals(3, expected.asMap().size()); - int i = 0; - for(Long ident: ids) { - Assert.assertEquals(1, expected.get(ident).size()); - BindingSet bs = expected.get(ident).iterator().next(); - Value val = bs.getValue("total"); - int total = Integer.parseInt(val.stringValue()); - Assert.assertEquals(3-i, total); - i++; - } - } - - - Set<BindingSet> expectedResults = new HashSet<>(); - try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { - results.forEachRemaining(x -> expectedResults.add(x)); - Assert.assertEquals(0, expectedResults.size()); - } - } - - } - - - @Test - public void periodicApplicationTest() throws Exception { - - String sparql = "prefix function: <http://org.apache.rya/function#> " // n - + "prefix time: <http://www.w3.org/2006/time#> " // n - + "select ?obs ?id where {" // n - + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n - + "?obs <uri:hasTime> ?time. " // n - + "?obs <uri:hasId> ?id } "; // n - - //make data - int periodMult = 15; - final ValueFactory vf = new ValueFactoryImpl(); - final DatatypeFactory dtf = DatatypeFactory.newInstance(); - //Sleep until current time aligns nicely with period to make - //results more predictable - while(System.currentTimeMillis() % (periodMult*1000) > 500); - ZonedDateTime time = ZonedDateTime.now(); - - ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); - String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); - String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); - String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); - - final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); - - try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { - Connector connector = ConfigUtils.getConnector(conf); - PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); - CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); - addData(statements); - app.start(); - - Multimap<Long, BindingSet> expected = HashMultimap.create(); - try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { - consumer.subscribe(Arrays.asList(id)); - long end = System.currentTimeMillis() + 4*periodMult*1000; - long lastBinId = 0L; - long binId = 0L; - List<Long> ids = new ArrayList<>(); - while (System.currentTimeMillis() < end) { - ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); - for(ConsumerRecord<String, BindingSet> record: records){ - BindingSet result = record.value(); - binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); - if(lastBinId != binId) { - lastBinId = binId; - ids.add(binId); - } - expected.put(binId, result); - } - } - - Assert.assertEquals(3, expected.asMap().size()); - int i = 0; - for(Long ident: ids) { - Assert.assertEquals(3-i, expected.get(ident).size()); - i++; - } - } - - - Set<BindingSet> expectedResults = new HashSet<>(); - try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { - results.forEachRemaining(x -> expectedResults.add(x)); - Assert.assertEquals(0, expectedResults.size()); - } - } - - } - - - @After - public void shutdown() { - registrar.close(); - app.stop(); - } - - private void addData(Collection<Statement> statements) throws DatatypeConfigurationException { - // add statements to Fluo - try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { - InsertTriples inserter = new InsertTriples(); - statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); - getMiniFluo().waitForObservers(); - } - } - - private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { - Properties kafkaProps = new Properties(); - kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); - kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); - kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return kafkaProps; - } - - private Properties getProps() throws IOException { - - Properties props = new Properties(); - try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) { - props.load(in); - } - - FluoConfiguration fluoConf = getFluoConfiguration(); - props.setProperty("accumulo.user", getUsername()); - props.setProperty("accumulo.password", getPassword()); - props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName()); - props.setProperty("accumulo.zookeepers", getMiniAccumuloCluster().getZooKeepers()); - props.setProperty("accumulo.rya.prefix", getRyaInstanceName()); - props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_APP_NAME, fluoConf.getApplicationName()); - props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_TABLE_NAME, fluoConf.getAccumuloTable()); - return props; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java deleted file mode 100644 index e05ca6f..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.application; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.core.client.FluoClientImpl; -import org.apache.fluo.recipes.test.AccumuloExportITBase; -import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; -import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; -import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.notification.TimestampedNotification; -import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider; -import org.junit.Assert; -import org.junit.Test; -import org.openrdf.query.MalformedQueryException; - -import com.google.common.collect.Sets; - -public class PeriodicNotificationProviderIT extends AccumuloExportITBase { - - @Test - public void testProvider() throws MalformedQueryException, InterruptedException, UnsupportedQueryException { - - String sparql = "prefix function: <http://org.apache.rya/function#> " // n - + "prefix time: <http://www.w3.org/2006/time#> " // n - + "select ?id (count(?obs) as ?total) where {" // n - + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n - + "?obs <uri:hasTime> ?time. " // n - + "?obs <uri:hasId> ?id } group by ?id"; // n - - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - PeriodicNotificationCoordinatorExecutor coord = new PeriodicNotificationCoordinatorExecutor(2, notifications); - PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); - CreateFluoPcj pcj = new CreateFluoPcj(); - - String id = null; - try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { - id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, Sets.newHashSet(), fluo).getQueryId(); - provider.processRegisteredNotifications(coord, fluo.newSnapshot()); - } - - TimestampedNotification notification = notifications.take(); - Assert.assertEquals(5000, notification.getInitialDelay()); - Assert.assertEquals(15000, notification.getPeriod()); - Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit()); - Assert.assertEquals(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notification.getId()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java deleted file mode 100644 index 874e7e2..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.exporter; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.kafka.base.KafkaITBase; -import org.apache.rya.kafka.base.KafkaTestInstanceRule; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -public class PeriodicNotificationExporterIT extends KafkaITBase { - - - @Rule - public KafkaTestInstanceRule kafkaTestInstanceRule = new KafkaTestInstanceRule(false); - - - private static final ValueFactory vf = new ValueFactoryImpl(); - - @Test - public void testExporter() throws InterruptedException { - - final String topic1 = kafkaTestInstanceRule.getKafkaTopicName() + "1"; - final String topic2 = kafkaTestInstanceRule.getKafkaTopicName() + "2"; - - kafkaTestInstanceRule.createTopic(topic1); - kafkaTestInstanceRule.createTopic(topic2); - - final BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>(); - - final KafkaExporterExecutor exporter = new KafkaExporterExecutor(new KafkaProducer<String, BindingSet>(createKafkaProducerConfig()), 1, records); - exporter.start(); - final QueryBindingSet bs1 = new QueryBindingSet(); - bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(1L)); - bs1.addBinding("name", vf.createURI("uri:Bob")); - final BindingSetRecord record1 = new BindingSetRecord(bs1, topic1); - - final QueryBindingSet bs2 = new QueryBindingSet(); - bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(2L)); - bs2.addBinding("name", vf.createURI("uri:Joe")); - final BindingSetRecord record2 = new BindingSetRecord(bs2, topic2); - - records.add(record1); - records.add(record2); - - final Set<BindingSet> expected1 = new HashSet<>(); - expected1.add(bs1); - final Set<BindingSet> expected2 = new HashSet<>(); - expected2.add(bs2); - - final Set<BindingSet> actual1 = getBindingSetsFromKafka(topic1); - final Set<BindingSet> actual2 = getBindingSetsFromKafka(topic2); - - Assert.assertEquals(expected1, actual1); - Assert.assertEquals(expected2, actual2); - - exporter.stop(); - } - - - private Properties createKafkaProducerConfig() { - final Properties props = createBootstrapServerConfig(); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); - return props; - } - private Properties createKafkaConsumerConfig() { - final Properties props = createBootstrapServerConfig(); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); - return props; - } - - - private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final String topicName) { - // setup consumer - final KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(createKafkaConsumerConfig()); - consumer.subscribe(Arrays.asList(topicName)); - return consumer; - } - - private Set<BindingSet> getBindingSetsFromKafka(final String topicName) { - KafkaConsumer<String, BindingSet> consumer = null; - - try { - consumer = makeBindingSetConsumer(topicName); - final ConsumerRecords<String, BindingSet> records = consumer.poll(20000); // Wait up to 20 seconds for a result to be published. - - final Set<BindingSet> bindingSets = new HashSet<>(); - records.forEach(x -> bindingSets.add(x.value())); - - return bindingSets; - - } catch (final Exception e) { - throw new RuntimeException(e); - } finally { - if (consumer != null) { - consumer.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java deleted file mode 100644 index 21109ae..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.processor; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.fluo.recipes.test.AccumuloExportITBase; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; -import org.apache.rya.periodic.notification.notification.TimestampedNotification; -import org.junit.Assert; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -public class PeriodicNotificationProcessorIT extends AccumuloExportITBase { - - private static final ValueFactory vf = new ValueFactoryImpl(); - private static final String RYA_INSTANCE_NAME = "rya_"; - - @Test - public void periodicProcessorTest() throws Exception { - - String id = UUID.randomUUID().toString().replace("-", ""); - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); - BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>(); - - TimestampedNotification ts1 = new TimestampedNotification( - PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build()); - long binId1 = (ts1.getTimestamp().getTime()/ts1.getPeriod())*ts1.getPeriod(); - - Thread.sleep(2000); - - TimestampedNotification ts2 = new TimestampedNotification( - PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build()); - long binId2 = (ts2.getTimestamp().getTime()/ts2.getPeriod())*ts2.getPeriod(); - - Set<NodeBin> expectedBins = new HashSet<>(); - expectedBins.add(new NodeBin(id, binId1)); - expectedBins.add(new NodeBin(id, binId2)); - - Set<BindingSet> expected = new HashSet<>(); - Set<VisibilityBindingSet> storageResults = new HashSet<>(); - - QueryBindingSet bs1 = new QueryBindingSet(); - bs1.addBinding("periodicBinId", vf.createLiteral(binId1)); - bs1.addBinding("id", vf.createLiteral(1)); - expected.add(bs1); - storageResults.add(new VisibilityBindingSet(bs1)); - - QueryBindingSet bs2 = new QueryBindingSet(); - bs2.addBinding("periodicBinId", vf.createLiteral(binId1)); - bs2.addBinding("id", vf.createLiteral(2)); - expected.add(bs2); - storageResults.add(new VisibilityBindingSet(bs2)); - - QueryBindingSet bs3 = new QueryBindingSet(); - bs3.addBinding("periodicBinId", vf.createLiteral(binId2)); - bs3.addBinding("id", vf.createLiteral(3)); - expected.add(bs3); - storageResults.add(new VisibilityBindingSet(bs3)); - - QueryBindingSet bs4 = new QueryBindingSet(); - bs4.addBinding("periodicBinId", vf.createLiteral(binId2)); - bs4.addBinding("id", vf.createLiteral(4)); - expected.add(bs4); - storageResults.add(new VisibilityBindingSet(bs4)); - - PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), - RYA_INSTANCE_NAME); - periodicStorage.createPeriodicQuery(id, "select ?id where {?obs <urn:hasId> ?id.}", new VariableOrder("periodicBinId", "id")); - periodicStorage.addPeriodicQueryResults(id, storageResults); - - NotificationProcessorExecutor processor = new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, 1); - processor.start(); - - notifications.add(ts1); - notifications.add(ts2); - - Thread.sleep(5000); - - Assert.assertEquals(expectedBins.size(), bins.size()); - Assert.assertEquals(true, bins.containsAll(expectedBins)); - - Set<BindingSet> actual = new HashSet<>(); - bindingSets.forEach(x -> actual.add(x.getBindingSet())); - Assert.assertEquals(expected, actual); - - processor.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java deleted file mode 100644 index 830fa46..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.pruner; - -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.xml.datatype.DatatypeFactory; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.ColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.core.client.FluoClientImpl; -import org.apache.rya.api.resolver.RdfToRyaConversions; -import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; -import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; -import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; -import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; -import org.apache.rya.periodic.notification.api.NodeBin; -import org.junit.Assert; -import org.junit.Test; -import org.openrdf.model.Statement; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; -import org.openrdf.query.impl.MapBindingSet; - -import com.google.common.collect.Sets; - -public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { - - - @Test - public void periodicPrunerTest() throws Exception { - - String sparql = "prefix function: <http://org.apache.rya/function#> " // n - + "prefix time: <http://www.w3.org/2006/time#> " // n - + "select ?id (count(?obs) as ?total) where {" // n - + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n - + "?obs <uri:hasTime> ?time. " // n - + "?obs <uri:hasId> ?id } group by ?id"; // n - - FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration()); - - // initialize resources and create pcj - PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), - getRyaInstanceName()); - CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage); - String queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId()); - - // create statements to ingest into Fluo - final ValueFactory vf = new ValueFactoryImpl(); - final DatatypeFactory dtf = DatatypeFactory.newInstance(); - ZonedDateTime time = ZonedDateTime.now(); - long currentTime = time.toInstant().toEpochMilli(); - - ZonedDateTime zTime1 = time.minusMinutes(30); - String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime2 = zTime1.minusMinutes(30); - String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime3 = zTime2.minusMinutes(30); - String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); - - ZonedDateTime zTime4 = zTime3.minusMinutes(30); - String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); - - final Collection<Statement> statements = Sets.newHashSet( - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), - vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), - vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")), - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), - vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), - vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), - vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2"))); - - // add statements to Fluo - InsertTriples inserter = new InsertTriples(); - statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); - - super.getMiniFluo().waitForObservers(); - - // FluoITHelper.printFluoTable(fluo); - - // Create the expected results of the SPARQL query once the PCJ has been - // computed. - final Set<BindingSet> expected1 = new HashSet<>(); - final Set<BindingSet> expected2 = new HashSet<>(); - final Set<BindingSet> expected3 = new HashSet<>(); - final Set<BindingSet> expected4 = new HashSet<>(); - - long period = 1800000; - long binId = (currentTime / period) * period; - - long bin1 = binId; - long bin2 = binId + period; - long bin3 = binId + 2 * period; - long bin4 = binId + 3 * period; - - MapBindingSet bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin1)); - expected1.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin1)); - expected1.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin1)); - expected1.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin1)); - expected1.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin2)); - expected2.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin2)); - expected2.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin2)); - expected2.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin3)); - expected3.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin3)); - expected3.add(bs); - - bs = new MapBindingSet(); - bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); - bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); - bs.addBinding("periodicBinId", vf.createLiteral(bin4)); - expected4.add(bs); - - // make sure that expected and actual results align after ingest - compareResults(periodicStorage, queryId, bin1, expected1); - compareResults(periodicStorage, queryId, bin2, expected2); - compareResults(periodicStorage, queryId, bin3, expected3); - compareResults(periodicStorage, queryId, bin4, expected4); - - BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); - PeriodicQueryPrunerExecutor pruner = new PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins); - pruner.start(); - - bins.add(new NodeBin(queryId, bin1)); - bins.add(new NodeBin(queryId, bin2)); - bins.add(new NodeBin(queryId, bin3)); - bins.add(new NodeBin(queryId, bin4)); - - Thread.sleep(10000); - - compareResults(periodicStorage, queryId, bin1, new HashSet<>()); - compareResults(periodicStorage, queryId, bin2, new HashSet<>()); - compareResults(periodicStorage, queryId, bin3, new HashSet<>()); - compareResults(periodicStorage, queryId, bin4, new HashSet<>()); - - compareFluoCounts(fluo, queryId, bin1); - compareFluoCounts(fluo, queryId, bin2); - compareFluoCounts(fluo, queryId, bin3); - compareFluoCounts(fluo, queryId, bin4); - - pruner.stop(); - - } - - private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception { - try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) { - Set<BindingSet> actual = new HashSet<>(); - while(iter.hasNext()) { - actual.add(iter.next()); - } - Assert.assertEquals(expected, actual); - } - } - - private void compareFluoCounts(FluoClient client, String pcjId, long bin) { - QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG)); - - VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID); - - try(Snapshot sx = client.newSnapshot()) { - String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); - Set<String> ids = new HashSet<>(); - PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids); - for(String id: ids) { - NodeType optNode = NodeType.fromNodeId(id).orNull(); - if(optNode == null) throw new RuntimeException("Invalid NodeType."); - Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs); - RowScanner scanner = sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build(); - int count = 0; - Iterator<ColumnScanner> colScannerIter = scanner.iterator(); - while(colScannerIter.hasNext()) { - ColumnScanner colScanner = colScannerIter.next(); - String row = colScanner.getRow().toString(); - Iterator<ColumnValue> values = colScanner.iterator(); - while(values.hasNext()) { - values.next(); - count++; - } - } - Assert.assertEquals(0, count); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java deleted file mode 100644 index 522e69d..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */package org.apache.rya.periodic.notification.registration.kafka; - -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.log4j.BasicConfigurator; -import org.apache.rya.kafka.base.KafkaITBase; -import org.apache.rya.kafka.base.KafkaTestInstanceRule; -import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.notification.TimestampedNotification; -import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; -import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { - - private KafkaNotificationRegistrationClient registration; - private PeriodicNotificationCoordinatorExecutor coord; - private KafkaNotificationProvider provider; - private String bootstrapServer; - - @Rule - public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); - - @Before - public void init() throws Exception { - bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); - } - - @Test - public void kafkaNotificationProviderTest() throws InterruptedException { - - BasicConfigurator.configure(); - - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - Properties props = createKafkaConfig(); - KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); - String topic = rule.getKafkaTopicName(); - rule.createTopic(topic); - - registration = new KafkaNotificationRegistrationClient(topic, producer); - coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); - provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); - provider.start(); - - registration.addNotification("1", 1, 0, TimeUnit.SECONDS); - Thread.sleep(4000); - // check that notifications are being added to the blocking queue - Assert.assertEquals(true, notifications.size() > 0); - - registration.deleteNotification("1"); - Thread.sleep(2000); - int size = notifications.size(); - // sleep for 2 seconds to ensure no more messages being produced - Thread.sleep(2000); - Assert.assertEquals(size, notifications.size()); - - tearDown(); - } - - @Test - public void kafkaNotificationMillisProviderTest() throws InterruptedException { - - BasicConfigurator.configure(); - - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - Properties props = createKafkaConfig(); - KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); - String topic = rule.getKafkaTopicName(); - rule.createTopic(topic); - - registration = new KafkaNotificationRegistrationClient(topic, producer); - coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); - provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); - provider.start(); - - registration.addNotification("1", 1000, 0, TimeUnit.MILLISECONDS); - Thread.sleep(4000); - // check that notifications are being added to the blocking queue - Assert.assertEquals(true, notifications.size() > 0); - - registration.deleteNotification("1"); - Thread.sleep(2000); - int size = notifications.size(); - // sleep for 2 seconds to ensure no more messages being produced - Thread.sleep(2000); - Assert.assertEquals(size, notifications.size()); - - tearDown(); - } - - private void tearDown() { - registration.close(); - provider.stop(); - coord.stop(); - } - - private Properties createKafkaConfig() { - Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName()); - - return props; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties deleted file mode 100644 index 19cc13c..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/log4j.properties +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Valid levels: -# TRACE, DEBUG, INFO, WARN, ERROR and FATAL -log4j.rootLogger=INFO, CONSOLE - -# Set independent logging levels -log4j.logger.org.apache.zookeeper=WARN -log4j.logger.kafka=WARN -log4j.logger.org.apache.kafka=WARN - -# LOGFILE is set to be a File appender using a PatternLayout. -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -#log4j.appender.CONSOLE.Threshold=DEBUG - -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n - -#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout -#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties deleted file mode 100644 index 4b25b93..0000000 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/resources/notification.properties +++ /dev/null @@ -1,35 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -#/ -accumulo.auths= -accumulo.instance="instance" -accumulo.user="root" -accumulo.password="secret" -accumulo.rya.prefix="rya_" -accumulo.zookeepers= -fluo.app.name="fluo_app" -fluo.table.name="fluo_table" -kafka.bootstrap.servers=127.0.0.1:9092 -kafka.notification.topic=notifications -kafka.notification.client.id=consumer0 -kafka.notification.group.id=group0 -cep.coordinator.threads=1 -cep.producer.threads=1 -cep.exporter.threads=1 -cep.processor.threads=1 -cep.pruner.threads=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml b/extras/rya.periodic.service/periodic.service.notification/pom.xml deleted file mode 100644 index 1e59e15..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/pom.xml +++ /dev/null @@ -1,112 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <!-- Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with this - work for additional information regarding copyright ownership. The ASF licenses - this file to you under the Apache License, Version 2.0 (the "License"); you - may not use this file except in compliance with the License. You may obtain - a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. --> - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service</artifactId> - <version>3.2.11-incubating-SNAPSHOT</version> - </parent> - - <artifactId>rya.periodic.service.notification</artifactId> - - <name>Apache Rya Periodic Service Notification</name> - <description>Notifications for Rya Periodic Service</description> - - <dependencies> - - <dependency> - <groupId>org.apache.twill</groupId> - <artifactId>twill-api</artifactId> - <version>0.11.0</version> - </dependency> - <dependency> - <groupId>org.apache.twill</groupId> - <artifactId>twill-yarn</artifactId> - <version>0.11.0</version> - <exclusions> - <exclusion> - <artifactId>kafka_2.10</artifactId> - <groupId>org.apache.kafka</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.8.0</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.indexing</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-query</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.indexing.pcj</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.app</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service.api</artifactId> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <encoding>UTF-8</encoding> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>3.0.0</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java deleted file mode 100644 index b2c3709..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicApplicationException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.application; - -/** - * Exception thrown when attempting to create a {@link PeriodicNotificationApplication}. - * Indicates that a factory was unable to create some component of the application - * because something was configured incorrectly. - * - */ -public class PeriodicApplicationException extends Exception { - - private static final long serialVersionUID = 1L; - - /** - * Creates a PeriodicApplicationException. - * @param message - message contained in Exception - */ - public PeriodicApplicationException(String message) { - super(message); - } - - /** - * Creates a PeriodicApplicationException. - * @param message - message contained in Exception - * @param t - Exception that spawned this PeriodicApplicationException - */ - public PeriodicApplicationException(String message, Throwable t) { - super(message, t); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java deleted file mode 100644 index 92a7d18..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.application; - -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; -import org.apache.rya.periodic.notification.api.BinPruner; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.api.LifeCycle; -import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; -import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; -import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; -import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider; -import org.openrdf.query.algebra.evaluation.function.Function; - -import com.google.common.base.Preconditions; - -/** - * The PeriodicNotificationApplication runs the key components of the Periodic - * Query Service. It consists of a {@link KafkaNotificationProvider}, a - * {@link NotificationCoordinatorExecutor}, a - * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, and a - * {@link PeriodicQueryPrunerExecutor}. These services run in coordination with - * one another to perform the following tasks in the indicated order: <br> - * <li>Retrieve new requests to generate periodic notifications from Kafka - * <li>Register them with the {@link NotificationCoordinatorExecutor} to - * generate the periodic notifications - * <li>As notifications are generated, they are added to a work queue that is - * monitored by the {@link NotificationProcessorExecutor}. - * <li>The processor processes the notifications by reading all of the query - * results corresponding to the bin and query id indicated by the notification. - * <li>After reading the results, the processor adds a {@link BindingSetRecord} - * to a work queue monitored by the {@link KafkaExporterExecutor}. - * <li>The processor then adds a {@link NodeBin} to a workqueue monitored by the - * {@link BinPruner} - * <li>The exporter processes the BindingSetRecord by exporing the result to - * Kafka - * <li>The BinPruner processes the NodeBin by cleaning up the results for the - * indicated bin and query in Accumulo and Fluo. <br> - * <br> - * The purpose of this Periodic Query Service is to facilitate the ability to - * answer Periodic Queries using the Rya Fluo application, where a Periodic - * Query is any query requesting periodic updates about events that occurred - * within a given window of time of this instant. This is also known as a - * rolling window query. Period Queries can be expressed using SPARQL by - * including the {@link Function} indicated by the URI - * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this - * Function with the following arguments: the temporal variable in the query - * that will be filtered on, the window of time that events must occur within, - * the period at which the user wants to receive updates, and the time unit. The - * following query requests all observations that occurred within the last - * minute and requests updates every 15 seconds. It also performs a count on - * those observations. <br> - * <br> - * <li>prefix function: http://org.apache.rya/function# - * <li>"prefix time: http://www.w3.org/2006/time# - * <li>"select (count(?obs) as ?total) where { - * <li>"Filter(function:periodic(?time, 1, .25, time:minutes)) - * <li>"?obs uri:hasTime ?time. - * <li>"?obs uri:hasId ?id } - * <li> - */ -public class PeriodicNotificationApplication implements LifeCycle { - - private static final Logger log = Logger.getLogger(PeriodicNotificationApplication.class); - private NotificationCoordinatorExecutor coordinator; - private KafkaNotificationProvider provider; - private PeriodicQueryPrunerExecutor pruner; - private NotificationProcessorExecutor processor; - private KafkaExporterExecutor exporter; - private boolean running = false; - - /** - * Creates a PeriodicNotificationApplication - * @param provider - {@link KafkaNotificationProvider} that retrieves new Notificaiton requests from Kafka - * @param coordinator - {NotificationCoordinator} that manages PeriodicNotifications. - * @param processor - {@link NotificationProcessorExecutor} that processes PeriodicNotifications - * @param exporter - {@link KafkaExporterExecutor} that exports periodic results - * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old periodic bins - */ - public PeriodicNotificationApplication(KafkaNotificationProvider provider, NotificationCoordinatorExecutor coordinator, - NotificationProcessorExecutor processor, KafkaExporterExecutor exporter, PeriodicQueryPrunerExecutor pruner) { - this.provider = Preconditions.checkNotNull(provider); - this.coordinator = Preconditions.checkNotNull(coordinator); - this.processor = Preconditions.checkNotNull(processor); - this.exporter = Preconditions.checkNotNull(exporter); - this.pruner = Preconditions.checkNotNull(pruner); - } - - @Override - public void start() { - if (!running) { - log.info("Starting PeriodicNotificationApplication."); - coordinator.start(); - provider.start(); - processor.start(); - pruner.start(); - exporter.start(); - running = true; - } - } - - @Override - public void stop() { - log.info("Stopping PeriodicNotificationApplication."); - provider.stop(); - coordinator.stop(); - processor.stop(); - pruner.stop(); - exporter.stop(); - running = false; - } - - /** - * @return boolean indicating whether the application is running - */ - @Override - public boolean currentlyRunning() { - return running; - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private PeriodicQueryPrunerExecutor pruner; - private KafkaNotificationProvider provider; - private NotificationProcessorExecutor processor; - private KafkaExporterExecutor exporter; - private NotificationCoordinatorExecutor coordinator; - - /** - * Sets the PeriodicQueryPrunerExecutor. - * @param pruner - PeriodicQueryPrunerExecutor for cleaning up old periodic bins - * @return this Builder for chaining method calls - */ - public Builder setPruner(PeriodicQueryPrunerExecutor pruner) { - this.pruner = pruner; - return this; - } - - /** - * Sets the KafkaNotificationProvider - * @param provider - KafkaNotificationProvider for retrieving new periodic notification requests from Kafka - * @return this Builder for chaining method calls - */ - public Builder setProvider(KafkaNotificationProvider provider) { - this.provider = provider; - return this; - } - - public Builder setProcessor(NotificationProcessorExecutor processor) { - this.processor = processor; - return this; - } - - /** - * Sets KafkaExporterExecutor - * @param exporter for exporting periodic query results to Kafka - * @return this Builder for chaining method calls - */ - public Builder setExporter(KafkaExporterExecutor exporter) { - this.exporter = exporter; - return this; - } - - /** - * Sets NotificationCoordinatorExecutor - * @param coordinator for managing and generating periodic notifications - * @return this Builder for chaining method calls - */ - public Builder setCoordinator(NotificationCoordinatorExecutor coordinator) { - this.coordinator = coordinator; - return this; - } - - /** - * Creates a PeriodicNotificationApplication - * @return PeriodicNotificationApplication for periodically polling Rya Fluo Application - */ - public PeriodicNotificationApplication build() { - return new PeriodicNotificationApplication(provider, coordinator, processor, exporter, pruner); - } - - } - -}