http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java new file mode 100644 index 0000000..f5cd13a --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.registration.kafka; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Consumer group to pull all requests for adding and deleting {@link Notification}s + * from Kafka. This Object executes {@link PeriodicNotificationConsumer}s that retrieve + * the {@link CommandNotification}s and register them with the {@link NotificationCoordinatorExecutor}. + * + */ +public class KafkaNotificationProvider implements LifeCycle { + private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationProvider.class); + private String topic; + private ExecutorService executor; + private NotificationCoordinatorExecutor coord; + private Properties props; + private int numThreads; + private boolean running = false; + Deserializer<String> keyDe; + Deserializer<CommandNotification> valDe; + List<PeriodicNotificationConsumer> consumers; + + /** + * Create KafkaNotificationProvider for reading new notification requests form Kafka + * @param topic - notification topic + * @param keyDe - Kafka message key deserializer + * @param valDe - Kafka message value deserializer + * @param props - properties used to creates a {@link KafkaConsumer} + * @param coord - {@link NotificationCoordinatorExecutor} for managing and generating notifications + * @param numThreads - number of threads used by this notification provider + */ + public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, Deserializer<CommandNotification> valDe, Properties props, + NotificationCoordinatorExecutor coord, int numThreads) { + this.coord = coord; + this.numThreads = numThreads; + this.topic = topic; + this.props = props; + this.consumers = new ArrayList<>(); + this.keyDe = keyDe; + this.valDe = valDe; + } + + @Override + public void stop() { + if (consumers != null && consumers.size() > 0) { + for (PeriodicNotificationConsumer consumer : consumers) { + consumer.shutdown(); + } + } + if (executor != null) { + executor.shutdown(); + } + running = false; + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.info("Interrupted during shutdown, exiting uncleanly"); + } + } + + public void start() { + if (!running) { + if (!coord.currentlyRunning()) { + coord.start(); + } + // now launch all the threads + executor = Executors.newFixedThreadPool(numThreads); + + // now create consumers to consume the messages + int threadNumber = 0; + for (int i = 0; i < numThreads; i++) { + LOG.info("Creating consumer:" + threadNumber); + KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<String, CommandNotification>(props, keyDe, valDe); + PeriodicNotificationConsumer periodicConsumer = new PeriodicNotificationConsumer(topic, consumer, threadNumber, coord); + consumers.add(periodicConsumer); + executor.submit(periodicConsumer); + threadNumber++; + } + running = true; + } + } + + @Override + public boolean currentlyRunning() { + return running; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java new file mode 100644 index 0000000..6785ce8 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.registration.kafka; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.log4j.Logger; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; + +/** + * Consumer for the {@link KafkaNotificationProvider}. This consumer pull messages + * from Kafka and registers them with the {@link NotificationCoordinatorExecutor}. + * + */ +public class PeriodicNotificationConsumer implements Runnable { + private KafkaConsumer<String, CommandNotification> consumer; + private int m_threadNumber; + private String topic; + private final AtomicBoolean closed = new AtomicBoolean(false); + private NotificationCoordinatorExecutor coord; + private static final Logger LOG = Logger.getLogger(PeriodicNotificationConsumer.class); + + /** + * Creates a new PeriodicNotificationConsumer for consuming new notification requests from + * Kafka. + * @param topic - new notification topic + * @param consumer - consumer for pulling new requests from Kafka + * @param a_threadNumber - number of consumer threads to be used + * @param coord - notification coordinator for managing and generating notifications + */ + public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, CommandNotification> consumer, int a_threadNumber, + NotificationCoordinatorExecutor coord) { + this.topic = topic; + m_threadNumber = a_threadNumber; + this.consumer = consumer; + this.coord = coord; + } + + public void run() { + + try { + LOG.info("Creating kafka stream for consumer:" + m_threadNumber); + consumer.subscribe(Arrays.asList(topic)); + while (!closed.get()) { + ConsumerRecords<String, CommandNotification> records = consumer.poll(10000); + // Handle new records + for(ConsumerRecord<String, CommandNotification> record: records) { + CommandNotification notification = record.value(); + LOG.info("Thread " + m_threadNumber + " is adding notification " + notification + " to queue."); + LOG.info("Message: " + notification); + coord.processNextCommandNotification(notification); + } + } + } catch (WakeupException e) { + // Ignore exception if closing + if (!closed.get()) throw e; + } finally { + consumer.close(); + } + } + + public void shutdown() { + closed.set(true); + consumer.wakeup(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java b/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java new file mode 100644 index 0000000..4aad1c6 --- /dev/null +++ b/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.notification.BasicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.junit.Assert; +import org.junit.Test; + +public class CommandNotificationSerializerTest { + + private CommandNotificationSerializer serializer = new CommandNotificationSerializer(); + private static final String topic = "topic"; + + @Test + public void basicSerializationTest() { + PeriodicNotification notification = PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(24) + .timeUnit(TimeUnit.DAYS).initialDelay(1).build(); + CommandNotification command = new CommandNotification(Command.ADD, notification); + Assert.assertEquals(command, serializer.deserialize(topic,serializer.serialize(topic, command))); + + PeriodicNotification notification1 = PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32) + .timeUnit(TimeUnit.SECONDS).initialDelay(15).build(); + CommandNotification command1 = new CommandNotification(Command.ADD, notification1); + Assert.assertEquals(command1, serializer.deserialize(topic,serializer.serialize(topic,command1))); + + PeriodicNotification notification2 = PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32) + .timeUnit(TimeUnit.SECONDS).initialDelay(15).build(); + CommandNotification command2 = new CommandNotification(Command.ADD, notification2); + Assert.assertEquals(command2, serializer.deserialize(topic,serializer.serialize(topic,command2))); + + BasicNotification notification3 = new BasicNotification(UUID.randomUUID().toString()); + CommandNotification command3 = new CommandNotification(Command.ADD, notification3); + Assert.assertEquals(command3, serializer.deserialize(topic,serializer.serialize(topic,command3))); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/pom.xml b/extras/periodic.notification/tests/pom.xml new file mode 100644 index 0000000..31a6c0e --- /dev/null +++ b/extras/periodic.notification/tests/pom.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.parent</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <artifactId>rya.periodic.notification.tests</artifactId> + + <name>Apache Rya Periodic Notification Service Integration Tests</name> + <description>Integration Tests for Rya Periodic Notification Service</description> + + <dependencies> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.test.base</artifactId> + <exclusions> + <exclusion> + <artifactId>log4j-1.2-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-core</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.notification.service</artifactId> + <exclusions> + <exclusion> + <artifactId>logback-classic</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + <exclusion> + <artifactId>logback-core</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java new file mode 100644 index 0000000..9109775 --- /dev/null +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import javax.xml.datatype.DatatypeConfigurationException; +import javax.xml.datatype.DatatypeFactory; + +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.kafka.base.EmbeddedKafkaInstance; +import org.apache.rya.kafka.base.EmbeddedKafkaSingleton; +import org.apache.rya.kafka.base.KafkaTestInstanceRule; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; +import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC; +import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;; + + +public class PeriodicNotificationApplicationIT extends RyaExportITBase { + + private PeriodicNotificationApplication app; + private KafkaNotificationRegistrationClient registrar; + private KafkaProducer<String, CommandNotification> producer; + private Properties props; + private Properties kafkaProps; + private PeriodicNotificationApplicationConfiguration conf; + private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance(); + private static String bootstrapServers; + + @Rule + public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); + + @BeforeClass + public static void initClass() { + bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + } + + @Before + public void init() throws Exception { + String topic = rule.getKafkaTopicName(); + rule.createTopic(topic); + + //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic + props = getProps(); + props.setProperty(NOTIFICATION_TOPIC, topic); + props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); + conf = new PeriodicNotificationApplicationConfiguration(props); + + //create Kafka Producer + kafkaProps = getKafkaProperties(conf); + producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer()); + + //extract kafka specific properties from application config + app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props); + registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer); + } + + @Test + public void periodicApplicationWithAggAndGroupByTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?type (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasObsType> ?type } group by ?type"; // n + + //make data + int periodMult = 15; + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + //Sleep until current time aligns nicely with period to makell + //results more predictable + while(System.currentTimeMillis() % (periodMult*1000) > 500); + ZonedDateTime time = ZonedDateTime.now(); + + ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasObsType"), vf.createLiteral("ship")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_5"), vf.createURI("uri:hasObsType"), vf.createLiteral("automobile"))); + + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { + Connector connector = ConfigUtils.getConnector(conf); + PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); + addData(statements); + app.start(); + + Multimap<Long, BindingSet> actual = HashMultimap.create(); + try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { + consumer.subscribe(Arrays.asList(id)); + long end = System.currentTimeMillis() + 4*periodMult*1000; + long lastBinId = 0L; + long binId = 0L; + List<Long> ids = new ArrayList<>(); + while (System.currentTimeMillis() < end) { + ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(ConsumerRecord<String, BindingSet> record: records){ + BindingSet result = record.value(); + binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); + if(lastBinId != binId) { + lastBinId = binId; + ids.add(binId); + } + actual.put(binId, result); + } + } + + Map<Long, Set<BindingSet>> expected = new HashMap<>(); + + Set<BindingSet> expected1 = new HashSet<>(); + QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); + bs1.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs1.addBinding("type", vf.createLiteral("airplane")); + + QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); + bs2.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs2.addBinding("type", vf.createLiteral("ship")); + + QueryBindingSet bs3 = new QueryBindingSet(); + bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(0))); + bs3.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); + bs3.addBinding("type", vf.createLiteral("automobile")); + + expected1.add(bs1); + expected1.add(bs2); + expected1.add(bs3); + + Set<BindingSet> expected2 = new HashSet<>(); + QueryBindingSet bs4 = new QueryBindingSet(); + bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); + bs4.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs4.addBinding("type", vf.createLiteral("airplane")); + + QueryBindingSet bs5 = new QueryBindingSet(); + bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(1))); + bs5.addBinding("total", new LiteralImpl("2", XMLSchema.INTEGER)); + bs5.addBinding("type", vf.createLiteral("ship")); + + expected2.add(bs4); + expected2.add(bs5); + + Set<BindingSet> expected3 = new HashSet<>(); + QueryBindingSet bs6 = new QueryBindingSet(); + bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); + bs6.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); + bs6.addBinding("type", vf.createLiteral("ship")); + + QueryBindingSet bs7 = new QueryBindingSet(); + bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(ids.get(2))); + bs7.addBinding("total", new LiteralImpl("1", XMLSchema.INTEGER)); + bs7.addBinding("type", vf.createLiteral("airplane")); + + expected3.add(bs6); + expected3.add(bs7); + + expected.put(ids.get(0), expected1); + expected.put(ids.get(1), expected2); + expected.put(ids.get(2), expected3); + + Assert.assertEquals(3, actual.asMap().size()); + for(Long ident: ids) { + Assert.assertEquals(expected.get(ident), actual.get(ident)); + } + } + + Set<BindingSet> expectedResults = new HashSet<>(); + try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { + results.forEachRemaining(x -> expectedResults.add(x)); + Assert.assertEquals(0, expectedResults.size()); + } + } + } + + + @Test + public void periodicApplicationWithAggTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } "; // n + + //make data + int periodMult = 15; + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + //Sleep until current time aligns nicely with period to make + //results more predictable + while(System.currentTimeMillis() % (periodMult*1000) > 500); + ZonedDateTime time = ZonedDateTime.now(); + + ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); + + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { + Connector connector = ConfigUtils.getConnector(conf); + PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); + addData(statements); + app.start(); + + Multimap<Long, BindingSet> expected = HashMultimap.create(); + try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { + consumer.subscribe(Arrays.asList(id)); + long end = System.currentTimeMillis() + 4*periodMult*1000; + long lastBinId = 0L; + long binId = 0L; + List<Long> ids = new ArrayList<>(); + while (System.currentTimeMillis() < end) { + ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(ConsumerRecord<String, BindingSet> record: records){ + BindingSet result = record.value(); + binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); + if(lastBinId != binId) { + lastBinId = binId; + ids.add(binId); + } + expected.put(binId, result); + } + } + + Assert.assertEquals(3, expected.asMap().size()); + int i = 0; + for(Long ident: ids) { + Assert.assertEquals(1, expected.get(ident).size()); + BindingSet bs = expected.get(ident).iterator().next(); + Value val = bs.getValue("total"); + int total = Integer.parseInt(val.stringValue()); + Assert.assertEquals(3-i, total); + i++; + } + } + + + Set<BindingSet> expectedResults = new HashSet<>(); + try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { + results.forEachRemaining(x -> expectedResults.add(x)); + Assert.assertEquals(0, expectedResults.size()); + } + } + + } + + + @Test + public void periodicApplicationTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?obs ?id where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } "; // n + + //make data + int periodMult = 15; + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + //Sleep until current time aligns nicely with period to make + //results more predictable + while(System.currentTimeMillis() % (periodMult*1000) > 500); + ZonedDateTime time = ZonedDateTime.now(); + + ZonedDateTime zTime1 = time.minusSeconds(2*periodMult); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3"))); + + try (FluoClient fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf)) { + Connector connector = ConfigUtils.getConnector(conf); + PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); + CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); + String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); + addData(statements); + app.start(); + + Multimap<Long, BindingSet> expected = HashMultimap.create(); + try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { + consumer.subscribe(Arrays.asList(id)); + long end = System.currentTimeMillis() + 4*periodMult*1000; + long lastBinId = 0L; + long binId = 0L; + List<Long> ids = new ArrayList<>(); + while (System.currentTimeMillis() < end) { + ConsumerRecords<String, BindingSet> records = consumer.poll(periodMult*1000); + for(ConsumerRecord<String, BindingSet> record: records){ + BindingSet result = record.value(); + binId = Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue()); + if(lastBinId != binId) { + lastBinId = binId; + ids.add(binId); + } + expected.put(binId, result); + } + } + + Assert.assertEquals(3, expected.asMap().size()); + int i = 0; + for(Long ident: ids) { + Assert.assertEquals(3-i, expected.get(ident).size()); + i++; + } + } + + + Set<BindingSet> expectedResults = new HashSet<>(); + try (CloseableIterator<BindingSet> results = storage.listResults(id, Optional.empty())) { + results.forEachRemaining(x -> expectedResults.add(x)); + Assert.assertEquals(0, expectedResults.size()); + } + } + + } + + + @After + public void shutdown() { + registrar.close(); + app.stop(); + } + + private void addData(Collection<Statement> statements) throws DatatypeConfigurationException { + // add statements to Fluo + try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { + InsertTriples inserter = new InsertTriples(); + statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); + getMiniFluo().waitForObservers(); + } + } + + private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { + Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return kafkaProps; + } + + private Properties getProps() throws IOException { + + Properties props = new Properties(); + try(InputStream in = new FileInputStream("src/test/resources/notification.properties")) { + props.load(in); + } + + FluoConfiguration fluoConf = getFluoConfiguration(); + props.setProperty("accumulo.user", getUsername()); + props.setProperty("accumulo.password", getPassword()); + props.setProperty("accumulo.instance", getMiniAccumuloCluster().getInstanceName()); + props.setProperty("accumulo.zookeepers", getMiniAccumuloCluster().getZooKeepers()); + props.setProperty("accumulo.rya.prefix", getRyaInstanceName()); + props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_APP_NAME, fluoConf.getApplicationName()); + props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_TABLE_NAME, fluoConf.getAccumuloTable()); + return props; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java new file mode 100644 index 0000000..e05ca6f --- /dev/null +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; + +import com.google.common.collect.Sets; + +public class PeriodicNotificationProviderIT extends AccumuloExportITBase { + + @Test + public void testProvider() throws MalformedQueryException, InterruptedException, UnsupportedQueryException { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?id (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } group by ?id"; // n + + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + PeriodicNotificationCoordinatorExecutor coord = new PeriodicNotificationCoordinatorExecutor(2, notifications); + PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); + CreateFluoPcj pcj = new CreateFluoPcj(); + + String id = null; + try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { + id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, Sets.newHashSet(), fluo).getQueryId(); + provider.processRegisteredNotifications(coord, fluo.newSnapshot()); + } + + TimestampedNotification notification = notifications.take(); + Assert.assertEquals(5000, notification.getInitialDelay()); + Assert.assertEquals(15000, notification.getPeriod()); + Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit()); + Assert.assertEquals(FluoQueryUtils.convertFluoQueryIdToPcjId(id), notification.getId()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java new file mode 100644 index 0000000..874e7e2 --- /dev/null +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.exporter; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.kafka.base.KafkaITBase; +import org.apache.rya.kafka.base.KafkaTestInstanceRule; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class PeriodicNotificationExporterIT extends KafkaITBase { + + + @Rule + public KafkaTestInstanceRule kafkaTestInstanceRule = new KafkaTestInstanceRule(false); + + + private static final ValueFactory vf = new ValueFactoryImpl(); + + @Test + public void testExporter() throws InterruptedException { + + final String topic1 = kafkaTestInstanceRule.getKafkaTopicName() + "1"; + final String topic2 = kafkaTestInstanceRule.getKafkaTopicName() + "2"; + + kafkaTestInstanceRule.createTopic(topic1); + kafkaTestInstanceRule.createTopic(topic2); + + final BlockingQueue<BindingSetRecord> records = new LinkedBlockingQueue<>(); + + final KafkaExporterExecutor exporter = new KafkaExporterExecutor(new KafkaProducer<String, BindingSet>(createKafkaProducerConfig()), 1, records); + exporter.start(); + final QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(1L)); + bs1.addBinding("name", vf.createURI("uri:Bob")); + final BindingSetRecord record1 = new BindingSetRecord(bs1, topic1); + + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, vf.createLiteral(2L)); + bs2.addBinding("name", vf.createURI("uri:Joe")); + final BindingSetRecord record2 = new BindingSetRecord(bs2, topic2); + + records.add(record1); + records.add(record2); + + final Set<BindingSet> expected1 = new HashSet<>(); + expected1.add(bs1); + final Set<BindingSet> expected2 = new HashSet<>(); + expected2.add(bs2); + + final Set<BindingSet> actual1 = getBindingSetsFromKafka(topic1); + final Set<BindingSet> actual2 = getBindingSetsFromKafka(topic2); + + Assert.assertEquals(expected1, actual1); + Assert.assertEquals(expected2, actual2); + + exporter.stop(); + } + + + private Properties createKafkaProducerConfig() { + final Properties props = createBootstrapServerConfig(); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); + return props; + } + private Properties createKafkaConsumerConfig() { + final Properties props = createBootstrapServerConfig(); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BindingSetSerDe.class.getName()); + return props; + } + + + private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final String topicName) { + // setup consumer + final KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(createKafkaConsumerConfig()); + consumer.subscribe(Arrays.asList(topicName)); + return consumer; + } + + private Set<BindingSet> getBindingSetsFromKafka(final String topicName) { + KafkaConsumer<String, BindingSet> consumer = null; + + try { + consumer = makeBindingSetConsumer(topicName); + final ConsumerRecords<String, BindingSet> records = consumer.poll(20000); // Wait up to 20 seconds for a result to be published. + + final Set<BindingSet> bindingSets = new HashSet<>(); + records.forEach(x -> bindingSets.add(x.value())); + + return bindingSets; + + } catch (final Exception e) { + throw new RuntimeException(e); + } finally { + if (consumer != null) { + consumer.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java new file mode 100644 index 0000000..21109ae --- /dev/null +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.processor; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.recipes.test.AccumuloExportITBase; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class PeriodicNotificationProcessorIT extends AccumuloExportITBase { + + private static final ValueFactory vf = new ValueFactoryImpl(); + private static final String RYA_INSTANCE_NAME = "rya_"; + + @Test + public void periodicProcessorTest() throws Exception { + + String id = UUID.randomUUID().toString().replace("-", ""); + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); + BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>(); + + TimestampedNotification ts1 = new TimestampedNotification( + PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build()); + long binId1 = (ts1.getTimestamp().getTime()/ts1.getPeriod())*ts1.getPeriod(); + + Thread.sleep(2000); + + TimestampedNotification ts2 = new TimestampedNotification( + PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build()); + long binId2 = (ts2.getTimestamp().getTime()/ts2.getPeriod())*ts2.getPeriod(); + + Set<NodeBin> expectedBins = new HashSet<>(); + expectedBins.add(new NodeBin(id, binId1)); + expectedBins.add(new NodeBin(id, binId2)); + + Set<BindingSet> expected = new HashSet<>(); + Set<VisibilityBindingSet> storageResults = new HashSet<>(); + + QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding("periodicBinId", vf.createLiteral(binId1)); + bs1.addBinding("id", vf.createLiteral(1)); + expected.add(bs1); + storageResults.add(new VisibilityBindingSet(bs1)); + + QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("periodicBinId", vf.createLiteral(binId1)); + bs2.addBinding("id", vf.createLiteral(2)); + expected.add(bs2); + storageResults.add(new VisibilityBindingSet(bs2)); + + QueryBindingSet bs3 = new QueryBindingSet(); + bs3.addBinding("periodicBinId", vf.createLiteral(binId2)); + bs3.addBinding("id", vf.createLiteral(3)); + expected.add(bs3); + storageResults.add(new VisibilityBindingSet(bs3)); + + QueryBindingSet bs4 = new QueryBindingSet(); + bs4.addBinding("periodicBinId", vf.createLiteral(binId2)); + bs4.addBinding("id", vf.createLiteral(4)); + expected.add(bs4); + storageResults.add(new VisibilityBindingSet(bs4)); + + PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), + RYA_INSTANCE_NAME); + periodicStorage.createPeriodicQuery(id, "select ?id where {?obs <urn:hasId> ?id.}", new VariableOrder("periodicBinId", "id")); + periodicStorage.addPeriodicQueryResults(id, storageResults); + + NotificationProcessorExecutor processor = new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, 1); + processor.start(); + + notifications.add(ts1); + notifications.add(ts2); + + Thread.sleep(5000); + + Assert.assertEquals(expectedBins.size(), bins.size()); + Assert.assertEquals(true, bins.containsAll(expectedBins)); + + Set<BindingSet> actual = new HashSet<>(); + bindingSets.forEach(x -> actual.add(x.getBindingSet())); + Assert.assertEquals(expected, actual); + + processor.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java new file mode 100644 index 0000000..830fa46 --- /dev/null +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.xml.datatype.DatatypeFactory; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Sets; + +public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { + + + @Test + public void periodicPrunerTest() throws Exception { + + String sparql = "prefix function: <http://org.apache.rya/function#> " // n + + "prefix time: <http://www.w3.org/2006/time#> " // n + + "select ?id (count(?obs) as ?total) where {" // n + + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n + + "?obs <uri:hasTime> ?time. " // n + + "?obs <uri:hasId> ?id } group by ?id"; // n + + FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration()); + + // initialize resources and create pcj + PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), + getRyaInstanceName()); + CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage); + String queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId()); + + // create statements to ingest into Fluo + final ValueFactory vf = new ValueFactoryImpl(); + final DatatypeFactory dtf = DatatypeFactory.newInstance(); + ZonedDateTime time = ZonedDateTime.now(); + long currentTime = time.toInstant().toEpochMilli(); + + ZonedDateTime zTime1 = time.minusMinutes(30); + String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime2 = zTime1.minusMinutes(30); + String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime3 = zTime2.minusMinutes(30); + String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT); + + ZonedDateTime zTime4 = zTime3.minusMinutes(30); + String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT); + + final Collection<Statement> statements = Sets.newHashSet( + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time1))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time2))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2")), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_3"), vf.createURI("uri:hasId"), vf.createLiteral("id_3")), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4")), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), + vf.createStatement(vf.createURI("urn:obs_1"), vf.createURI("uri:hasId"), vf.createLiteral("id_1")), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasTime"), + vf.createLiteral(dtf.newXMLGregorianCalendar(time3))), + vf.createStatement(vf.createURI("urn:obs_2"), vf.createURI("uri:hasId"), vf.createLiteral("id_2"))); + + // add statements to Fluo + InsertTriples inserter = new InsertTriples(); + statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); + + super.getMiniFluo().waitForObservers(); + + // FluoITHelper.printFluoTable(fluo); + + // Create the expected results of the SPARQL query once the PCJ has been + // computed. + final Set<BindingSet> expected1 = new HashSet<>(); + final Set<BindingSet> expected2 = new HashSet<>(); + final Set<BindingSet> expected3 = new HashSet<>(); + final Set<BindingSet> expected4 = new HashSet<>(); + + long period = 1800000; + long binId = (currentTime / period) * period; + + long bin1 = binId; + long bin2 = binId + period; + long bin3 = binId + 2 * period; + long bin4 = binId + 3 * period; + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin1)); + expected1.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin2)); + expected2.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin2)); + expected2.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin2)); + expected2.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin3)); + expected3.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin3)); + expected3.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); + bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); + bs.addBinding("periodicBinId", vf.createLiteral(bin4)); + expected4.add(bs); + + // make sure that expected and actual results align after ingest + compareResults(periodicStorage, queryId, bin1, expected1); + compareResults(periodicStorage, queryId, bin2, expected2); + compareResults(periodicStorage, queryId, bin3, expected3); + compareResults(periodicStorage, queryId, bin4, expected4); + + BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); + PeriodicQueryPrunerExecutor pruner = new PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins); + pruner.start(); + + bins.add(new NodeBin(queryId, bin1)); + bins.add(new NodeBin(queryId, bin2)); + bins.add(new NodeBin(queryId, bin3)); + bins.add(new NodeBin(queryId, bin4)); + + Thread.sleep(10000); + + compareResults(periodicStorage, queryId, bin1, new HashSet<>()); + compareResults(periodicStorage, queryId, bin2, new HashSet<>()); + compareResults(periodicStorage, queryId, bin3, new HashSet<>()); + compareResults(periodicStorage, queryId, bin4, new HashSet<>()); + + compareFluoCounts(fluo, queryId, bin1); + compareFluoCounts(fluo, queryId, bin2); + compareFluoCounts(fluo, queryId, bin3); + compareFluoCounts(fluo, queryId, bin4); + + pruner.stop(); + + } + + private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception { + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) { + Set<BindingSet> actual = new HashSet<>(); + while(iter.hasNext()) { + actual.add(iter.next()); + } + Assert.assertEquals(expected, actual); + } + } + + private void compareFluoCounts(FluoClient client, String pcjId, long bin) { + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG)); + + VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID); + + try(Snapshot sx = client.newSnapshot()) { + String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); + Set<String> ids = new HashSet<>(); + PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids); + for(String id: ids) { + NodeType optNode = NodeType.fromNodeId(id).orNull(); + if(optNode == null) throw new RuntimeException("Invalid NodeType."); + Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs); + RowScanner scanner = sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build(); + int count = 0; + Iterator<ColumnScanner> colScannerIter = scanner.iterator(); + while(colScannerIter.hasNext()) { + ColumnScanner colScanner = colScannerIter.next(); + String row = colScanner.getRow().toString(); + Iterator<ColumnValue> values = colScanner.iterator(); + while(values.hasNext()) { + values.next(); + count++; + } + } + Assert.assertEquals(0, count); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java new file mode 100644 index 0000000..522e69d --- /dev/null +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.periodic.notification.registration.kafka; + +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.log4j.BasicConfigurator; +import org.apache.rya.kafka.base.KafkaITBase; +import org.apache.rya.kafka.base.KafkaTestInstanceRule; +import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { + + private KafkaNotificationRegistrationClient registration; + private PeriodicNotificationCoordinatorExecutor coord; + private KafkaNotificationProvider provider; + private String bootstrapServer; + + @Rule + public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); + + @Before + public void init() throws Exception { + bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + } + + @Test + public void kafkaNotificationProviderTest() throws InterruptedException { + + BasicConfigurator.configure(); + + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + Properties props = createKafkaConfig(); + KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + String topic = rule.getKafkaTopicName(); + rule.createTopic(topic); + + registration = new KafkaNotificationRegistrationClient(topic, producer); + coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); + provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); + provider.start(); + + registration.addNotification("1", 1, 0, TimeUnit.SECONDS); + Thread.sleep(4000); + // check that notifications are being added to the blocking queue + Assert.assertEquals(true, notifications.size() > 0); + + registration.deleteNotification("1"); + Thread.sleep(2000); + int size = notifications.size(); + // sleep for 2 seconds to ensure no more messages being produced + Thread.sleep(2000); + Assert.assertEquals(size, notifications.size()); + + tearDown(); + } + + @Test + public void kafkaNotificationMillisProviderTest() throws InterruptedException { + + BasicConfigurator.configure(); + + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + Properties props = createKafkaConfig(); + KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + String topic = rule.getKafkaTopicName(); + rule.createTopic(topic); + + registration = new KafkaNotificationRegistrationClient(topic, producer); + coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); + provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); + provider.start(); + + registration.addNotification("1", 1000, 0, TimeUnit.MILLISECONDS); + Thread.sleep(4000); + // check that notifications are being added to the blocking queue + Assert.assertEquals(true, notifications.size() > 0); + + registration.deleteNotification("1"); + Thread.sleep(2000); + int size = notifications.size(); + // sleep for 2 seconds to ensure no more messages being produced + Thread.sleep(2000); + Assert.assertEquals(size, notifications.size()); + + tearDown(); + } + + private void tearDown() { + registration.close(); + provider.stop(); + coord.stop(); + } + + private Properties createKafkaConfig() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName()); + + return props; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/resources/log4j.properties b/extras/periodic.notification/tests/src/test/resources/log4j.properties new file mode 100644 index 0000000..19cc13c --- /dev/null +++ b/extras/periodic.notification/tests/src/test/resources/log4j.properties @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Valid levels: +# TRACE, DEBUG, INFO, WARN, ERROR and FATAL +log4j.rootLogger=INFO, CONSOLE + +# Set independent logging levels +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.kafka=WARN +log4j.logger.org.apache.kafka=WARN + +# LOGFILE is set to be a File appender using a PatternLayout. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +#log4j.appender.CONSOLE.Threshold=DEBUG + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout +#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/resources/notification.properties ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/resources/notification.properties b/extras/periodic.notification/tests/src/test/resources/notification.properties new file mode 100644 index 0000000..4b25b93 --- /dev/null +++ b/extras/periodic.notification/tests/src/test/resources/notification.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#/ +accumulo.auths= +accumulo.instance="instance" +accumulo.user="root" +accumulo.password="secret" +accumulo.rya.prefix="rya_" +accumulo.zookeepers= +fluo.app.name="fluo_app" +fluo.table.name="fluo_table" +kafka.bootstrap.servers=127.0.0.1:9092 +kafka.notification.topic=notifications +kafka.notification.client.id=consumer0 +kafka.notification.group.id=group0 +cep.coordinator.threads=1 +cep.producer.threads=1 +cep.exporter.threads=1 +cep.processor.threads=1 +cep.pruner.threads=1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/pom.xml ---------------------------------------------------------------------- diff --git a/extras/pom.xml b/extras/pom.xml index 82930bc..53c7b4f 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -33,7 +33,7 @@ under the License. <modules> <module>rya.prospector</module> <module>rya.manual</module> - <module>rya.periodic.service</module> + <module>periodic.notification</module> <module>shell</module> <module>indexing</module> <module>rya.indexing.pcj</module>