http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java new file mode 100644 index 0000000..92a7d18 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; +import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; +import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; +import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider; +import org.openrdf.query.algebra.evaluation.function.Function; + +import com.google.common.base.Preconditions; + +/** + * The PeriodicNotificationApplication runs the key components of the Periodic + * Query Service. It consists of a {@link KafkaNotificationProvider}, a + * {@link NotificationCoordinatorExecutor}, a + * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, and a + * {@link PeriodicQueryPrunerExecutor}. These services run in coordination with + * one another to perform the following tasks in the indicated order: <br> + * <li>Retrieve new requests to generate periodic notifications from Kafka + * <li>Register them with the {@link NotificationCoordinatorExecutor} to + * generate the periodic notifications + * <li>As notifications are generated, they are added to a work queue that is + * monitored by the {@link NotificationProcessorExecutor}. + * <li>The processor processes the notifications by reading all of the query + * results corresponding to the bin and query id indicated by the notification. + * <li>After reading the results, the processor adds a {@link BindingSetRecord} + * to a work queue monitored by the {@link KafkaExporterExecutor}. + * <li>The processor then adds a {@link NodeBin} to a workqueue monitored by the + * {@link BinPruner} + * <li>The exporter processes the BindingSetRecord by exporing the result to + * Kafka + * <li>The BinPruner processes the NodeBin by cleaning up the results for the + * indicated bin and query in Accumulo and Fluo. <br> + * <br> + * The purpose of this Periodic Query Service is to facilitate the ability to + * answer Periodic Queries using the Rya Fluo application, where a Periodic + * Query is any query requesting periodic updates about events that occurred + * within a given window of time of this instant. This is also known as a + * rolling window query. Period Queries can be expressed using SPARQL by + * including the {@link Function} indicated by the URI + * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this + * Function with the following arguments: the temporal variable in the query + * that will be filtered on, the window of time that events must occur within, + * the period at which the user wants to receive updates, and the time unit. The + * following query requests all observations that occurred within the last + * minute and requests updates every 15 seconds. It also performs a count on + * those observations. <br> + * <br> + * <li>prefix function: http://org.apache.rya/function# + * <li>"prefix time: http://www.w3.org/2006/time# + * <li>"select (count(?obs) as ?total) where { + * <li>"Filter(function:periodic(?time, 1, .25, time:minutes)) + * <li>"?obs uri:hasTime ?time. + * <li>"?obs uri:hasId ?id } + * <li> + */ +public class PeriodicNotificationApplication implements LifeCycle { + + private static final Logger log = Logger.getLogger(PeriodicNotificationApplication.class); + private NotificationCoordinatorExecutor coordinator; + private KafkaNotificationProvider provider; + private PeriodicQueryPrunerExecutor pruner; + private NotificationProcessorExecutor processor; + private KafkaExporterExecutor exporter; + private boolean running = false; + + /** + * Creates a PeriodicNotificationApplication + * @param provider - {@link KafkaNotificationProvider} that retrieves new Notificaiton requests from Kafka + * @param coordinator - {NotificationCoordinator} that manages PeriodicNotifications. + * @param processor - {@link NotificationProcessorExecutor} that processes PeriodicNotifications + * @param exporter - {@link KafkaExporterExecutor} that exports periodic results + * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up old periodic bins + */ + public PeriodicNotificationApplication(KafkaNotificationProvider provider, NotificationCoordinatorExecutor coordinator, + NotificationProcessorExecutor processor, KafkaExporterExecutor exporter, PeriodicQueryPrunerExecutor pruner) { + this.provider = Preconditions.checkNotNull(provider); + this.coordinator = Preconditions.checkNotNull(coordinator); + this.processor = Preconditions.checkNotNull(processor); + this.exporter = Preconditions.checkNotNull(exporter); + this.pruner = Preconditions.checkNotNull(pruner); + } + + @Override + public void start() { + if (!running) { + log.info("Starting PeriodicNotificationApplication."); + coordinator.start(); + provider.start(); + processor.start(); + pruner.start(); + exporter.start(); + running = true; + } + } + + @Override + public void stop() { + log.info("Stopping PeriodicNotificationApplication."); + provider.stop(); + coordinator.stop(); + processor.stop(); + pruner.stop(); + exporter.stop(); + running = false; + } + + /** + * @return boolean indicating whether the application is running + */ + @Override + public boolean currentlyRunning() { + return running; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private PeriodicQueryPrunerExecutor pruner; + private KafkaNotificationProvider provider; + private NotificationProcessorExecutor processor; + private KafkaExporterExecutor exporter; + private NotificationCoordinatorExecutor coordinator; + + /** + * Sets the PeriodicQueryPrunerExecutor. + * @param pruner - PeriodicQueryPrunerExecutor for cleaning up old periodic bins + * @return this Builder for chaining method calls + */ + public Builder setPruner(PeriodicQueryPrunerExecutor pruner) { + this.pruner = pruner; + return this; + } + + /** + * Sets the KafkaNotificationProvider + * @param provider - KafkaNotificationProvider for retrieving new periodic notification requests from Kafka + * @return this Builder for chaining method calls + */ + public Builder setProvider(KafkaNotificationProvider provider) { + this.provider = provider; + return this; + } + + public Builder setProcessor(NotificationProcessorExecutor processor) { + this.processor = processor; + return this; + } + + /** + * Sets KafkaExporterExecutor + * @param exporter for exporting periodic query results to Kafka + * @return this Builder for chaining method calls + */ + public Builder setExporter(KafkaExporterExecutor exporter) { + this.exporter = exporter; + return this; + } + + /** + * Sets NotificationCoordinatorExecutor + * @param coordinator for managing and generating periodic notifications + * @return this Builder for chaining method calls + */ + public Builder setCoordinator(NotificationCoordinatorExecutor coordinator) { + this.coordinator = coordinator; + return this; + } + + /** + * Creates a PeriodicNotificationApplication + * @return PeriodicNotificationApplication for periodically polling Rya Fluo Application + */ + public PeriodicNotificationApplication build() { + return new PeriodicNotificationApplication(provider, coordinator, processor, exporter, pruner); + } + + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java new file mode 100644 index 0000000..d69efe5 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import java.util.Properties; + +import org.apache.rya.accumulo.AccumuloRdfConfiguration; + +import jline.internal.Preconditions; + +/** + * Configuration object for creating a {@link PeriodicNotificationApplication}. + */ +public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfConfiguration { + + public static String FLUO_APP_NAME = "fluo.app.name"; + public static String FLUO_TABLE_NAME = "fluo.table.name"; + public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static String NOTIFICATION_TOPIC = "kafka.notification.topic"; + public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id"; + public static String NOTIFICATION_CLIENT_ID = "kafka.notification.client.id"; + public static String COORDINATOR_THREADS = "cep.coordinator.threads"; + public static String PRODUCER_THREADS = "cep.producer.threads"; + public static String EXPORTER_THREADS = "cep.exporter.threads"; + public static String PROCESSOR_THREADS = "cep.processor.threads"; + public static String PRUNER_THREADS = "cep.pruner.threads"; + + public PeriodicNotificationApplicationConfiguration() {} + + /** + * Creates an PeriodicNotificationApplicationConfiguration object from a Properties file. This method assumes + * that all values in the Properties file are Strings and that the Properties file uses the keys below. + * See rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties for an example. + * <br> + * <ul> + * <li>"accumulo.auths" - String of Accumulo authorizations. Default is empty String. + * <li>"accumulo.instance" - Accumulo instance name (required) + * <li>"accumulo.user" - Accumulo user (required) + * <li>"accumulo.password" - Accumulo password (required) + * <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance. Default is "rya_" + * <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance (required) + * <li>"fluo.app.name" - Name of Fluo Application (required) + * <li>"fluo.table.name" - Name of Fluo Table (required) + * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required) + * <li>"kafka.notification.topic" - Topic to which new Periodic Notifications are published. Default is "notifications". + * <li>"kafka.notification.client.id" - Client Id for notification topic. Default is "consumer0" + * <li>"kafka.notification.group.id" - Group Id for notification topic. Default is "group0" + * <li>"cep.coordinator.threads" - Number of threads used by coordinator. Default is 1. + * <li>"cep.producer.threads" - Number of threads used by producer. Default is 1. + * <li>"cep.exporter.threads" - Number of threads used by exporter. Default is 1. + * <li>"cep.processor.threads" - Number of threads used by processor. Default is 1. + * <li>"cep.pruner.threads" - Number of threads used by pruner. Default is 1. + * </ul> + * <br> + * @param props - Properties file containing Accumulo specific configuration parameters + * @return AccumumuloRdfConfiguration with properties set + */ + public PeriodicNotificationApplicationConfiguration(Properties props) { + super(fromProperties(props)); + setFluoAppName(props.getProperty(FLUO_APP_NAME)); + setFluoTableName(props.getProperty(FLUO_TABLE_NAME)); + setBootStrapServers(props.getProperty(KAFKA_BOOTSTRAP_SERVERS)); + setNotificationClientId(props.getProperty(NOTIFICATION_CLIENT_ID, "consumer0")); + setNotificationTopic(props.getProperty(NOTIFICATION_TOPIC, "notifications")); + setNotificationGroupId(props.getProperty(NOTIFICATION_GROUP_ID, "group0")); + setProducerThreads(Integer.parseInt(props.getProperty(PRODUCER_THREADS, "1"))); + setProcessorThreads(Integer.parseInt(props.getProperty(PROCESSOR_THREADS, "1"))); + setExporterThreads(Integer.parseInt(props.getProperty(EXPORTER_THREADS, "1"))); + setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, "1"))); + setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, "1"))); + } + + /** + * Sets the name of the Fluo Application + * @param fluoAppName + */ + public void setFluoAppName(String fluoAppName) { + set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName)); + } + + /** + * Sets the name of the Fluo table + * @param fluoTableName + */ + public void setFluoTableName(String fluoTableName) { + set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName)); + } + + /** + * Sets the Kafka bootstrap servers + * @param bootStrapServers + */ + public void setBootStrapServers(String bootStrapServers) { + set(KAFKA_BOOTSTRAP_SERVERS, Preconditions.checkNotNull(bootStrapServers)); + } + + /** + * Sets the Kafka topic name for new notification requests + * @param notificationTopic + */ + public void setNotificationTopic(String notificationTopic) { + set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic)); + } + + /** + * Sets the GroupId for new notification request topic + * @param notificationGroupId + */ + public void setNotificationGroupId(String notificationGroupId) { + set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationGroupId)); + } + + /** + * Sets the ClientId for the Kafka notification topic + * @param notificationClientId + */ + public void setNotificationClientId(String notificationClientId) { + set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationClientId)); + } + + /** + * Sets the number of threads for the coordinator + * @param threads + */ + public void setCoordinatorThreads(int threads) { + setInt(COORDINATOR_THREADS, threads); + } + + /** + * Sets the number of threads for the exporter + * @param threads + */ + public void setExporterThreads(int threads) { + setInt(EXPORTER_THREADS, threads); + } + + /** + * Sets the number of threads for the producer for reading new periodic notifications + * @param threads + */ + public void setProducerThreads(int threads) { + setInt(PRODUCER_THREADS, threads); + } + + /** + * Sets the number of threads for the bin pruner + * @param threads + */ + public void setPrunerThreads(int threads) { + setInt(PRUNER_THREADS, threads); + } + + /** + * Sets the number of threads for the Notification processor + * @param threads + */ + public void setProcessorThreads(int threads) { + setInt(PROCESSOR_THREADS, threads); + } + + /** + * @return name of the Fluo application + */ + public String getFluoAppName() { + return get(FLUO_APP_NAME); + } + + /** + * @return name of the Fluo table + */ + public String getFluoTableName() { + return get(FLUO_TABLE_NAME); + } + + /** + * @return Kafka bootstrap servers + */ + public String getBootStrapServers() { + return get(KAFKA_BOOTSTRAP_SERVERS); + } + + /** + * @return notification topic + */ + public String getNotificationTopic() { + return get(NOTIFICATION_TOPIC, "notifications"); + } + + /** + * @return Kafka GroupId for the notificaton topic + */ + public String getNotificationGroupId() { + return get(NOTIFICATION_GROUP_ID, "group0"); + } + + /** + * @return Kafka ClientId for the notification topic + */ + public String getNotificationClientId() { + return get(NOTIFICATION_CLIENT_ID, "consumer0"); + } + + /** + * @return the number of threads for the coordinator + */ + public int getCoordinatorThreads() { + return getInt(COORDINATOR_THREADS, 1); + } + + /** + * @return the number of threads for the exporter + */ + public int getExporterThreads() { + return getInt(EXPORTER_THREADS, 1); + } + + /** + * @return the number of threads for the notification producer + */ + public int getProducerThreads() { + return getInt(PRODUCER_THREADS, 1); + } + + /** + * @return the number of threads for the bin pruner + */ + public int getPrunerThreads() { + return getInt(PRUNER_THREADS, 1); + } + + /** + * @return number of threads for the processor + */ + public int getProcessorThreads() { + return getInt(PROCESSOR_THREADS, 1); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java new file mode 100644 index 0000000..771a4ab --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; +import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; +import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider; +import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider; +import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.openrdf.query.BindingSet; + +/** + * Factory for creating a {@link PeriodicNotificationApplication}. + */ +public class PeriodicNotificationApplicationFactory { + + /** + * Create a PeriodicNotificationApplication. + * @param props - Properties file that specifies the parameters needed to create the application + * @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results + * @throws PeriodicApplicationException + */ + public static PeriodicNotificationApplication getPeriodicApplication(Properties props) throws PeriodicApplicationException { + PeriodicNotificationApplicationConfiguration conf = new PeriodicNotificationApplicationConfiguration(props); + Properties kafkaProps = getKafkaProperties(conf); + + BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); + BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); + BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>(); + + FluoClient fluo = null; + try { + PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf); + fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf); + NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications); + addRegisteredNotices(coordinator, fluo.newSnapshot()); + KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProps, bindingSets); + PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins); + NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads()); + KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaProps); + return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter) + .setProcessor(processor).setPruner(pruner).build(); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new PeriodicApplicationException(e.getMessage()); + } + } + + private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) { + coord.start(); + PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); + provider.processRegisteredNotifications(coord, sx); + } + + private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) { + return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications); + } + + private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) { + KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe()); + return new KafkaExporterExecutor(producer, numThreads, bindingSets); + } + + private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads, + BlockingQueue<NodeBin> bins) { + return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins); + } + + private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage, + BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, + int numThreads) { + return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads); + } + + private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord, + Properties props) { + return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, + numThreads); + } + + private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf) + throws AccumuloException, AccumuloSecurityException { + Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers()); + Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword())); + String ryaInstance = conf.getTablePrefix(); + return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance); + } + + private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { + Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers()); + kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId()); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); + kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return kafkaProps; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java new file mode 100644 index 0000000..0486244 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.coordinator; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.api.NotificationProcessor; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Implementation of {@link NotificationCoordinatorExecutor} that generates regular notifications + * as indicated by {@link PeriodicNotification}s that are registered with this Object. When notifications + * are generated they are placed on a work queue to be processed by the {@link NotificationProcessor}. + * + */ +public class PeriodicNotificationCoordinatorExecutor implements NotificationCoordinatorExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationCoordinatorExecutor.class); + private int numThreads; + private ScheduledExecutorService producerThreadPool; + private Map<String, ScheduledFuture<?>> serviceMap = new HashMap<>(); + private BlockingQueue<TimestampedNotification> notifications; + private final ReentrantLock lock = new ReentrantLock(true); + private boolean running = false; + + public PeriodicNotificationCoordinatorExecutor(int numThreads, BlockingQueue<TimestampedNotification> notifications) { + this.numThreads = numThreads; + this.notifications = notifications; + } + + @Override + public void processNextCommandNotification(CommandNotification notification) { + lock.lock(); + try { + processNotification(notification); + } finally { + lock.unlock(); + } + } + + @Override + public void start() { + if (!running) { + producerThreadPool = Executors.newScheduledThreadPool(numThreads); + running = true; + } + } + + @Override + public void stop() { + + if (producerThreadPool != null) { + producerThreadPool.shutdown(); + } + + running = false; + + try { + if (!producerThreadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + producerThreadPool.shutdownNow(); + } + } catch (Exception e) { + LOG.info("Service Executor Shutdown has been called. Terminating NotificationRunnable"); + } + } + + private void processNotification(CommandNotification notification) { + Command command = notification.getCommand(); + Notification periodic = notification.getNotification(); + switch (command) { + case ADD: + addNotification(periodic); + break; + case DELETE: + deleteNotification(periodic); + break; + } + } + + private void addNotification(Notification notification) { + Preconditions.checkArgument(notification instanceof PeriodicNotification); + PeriodicNotification notify = (PeriodicNotification) notification; + if (!serviceMap.containsKey(notification.getId())) { + ScheduledFuture<?> future = producerThreadPool.scheduleAtFixedRate(new NotificationProducer(notify), notify.getInitialDelay(), + notify.getPeriod(), notify.getTimeUnit()); + serviceMap.put(notify.getId(), future); + } + } + + private boolean deleteNotification(Notification notification) { + if (serviceMap.containsKey(notification.getId())) { + ScheduledFuture<?> future = serviceMap.remove(notification.getId()); + future.cancel(true); + return true; + } + return false; + } + + /** + * Scheduled Task that places a {@link PeriodicNotification} + * in the work queue at regular intervals. + * + */ + class NotificationProducer implements Runnable { + + private PeriodicNotification notification; + + public NotificationProducer(PeriodicNotification notification) { + this.notification = notification; + } + + public void run() { + try { + notifications.put(new TimestampedNotification(notification)); + } catch (InterruptedException e) { + LOG.info("Unable to add notification. Process interrupted. "); + throw new RuntimeException(e); + } + } + + } + + @Override + public boolean currentlyRunning() { + return running; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java new file mode 100644 index 0000000..c2e5ebf --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.exporter; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.log4j.Logger; +import org.apache.rya.periodic.notification.api.BindingSetExporter; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.openrdf.query.BindingSet; + +import jline.internal.Preconditions; + +/** + * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s. + * + */ +public class KafkaExporterExecutor implements LifeCycle { + + private static final Logger log = Logger.getLogger(BindingSetExporter.class); + private KafkaProducer<String, BindingSet> producer; + private BlockingQueue<BindingSetRecord> bindingSets; + private ExecutorService executor; + private List<KafkaPeriodicBindingSetExporter> exporters; + private int num_Threads; + private boolean running = false; + + /** + * Creates a KafkaExporterExecutor for exporting periodic query results to Kafka. + * @param producer for publishing results to Kafka + * @param num_Threads number of threads used to publish results + * @param bindingSets - work queue containing {@link BindingSet}s to be published + */ + public KafkaExporterExecutor(KafkaProducer<String, BindingSet> producer, int num_Threads, BlockingQueue<BindingSetRecord> bindingSets) { + Preconditions.checkNotNull(producer); + Preconditions.checkNotNull(bindingSets); + this.producer = producer; + this.bindingSets = bindingSets; + this.num_Threads = num_Threads; + this.exporters = new ArrayList<>(); + } + + @Override + public void start() { + if (!running) { + executor = Executors.newFixedThreadPool(num_Threads); + + for (int threadNumber = 0; threadNumber < num_Threads; threadNumber++) { + log.info("Creating exporter:" + threadNumber); + KafkaPeriodicBindingSetExporter exporter = new KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets); + exporters.add(exporter); + executor.submit(exporter); + } + running = true; + } + } + + @Override + public void stop() { + if (executor != null) { + executor.shutdown(); + } + + if (exporters != null && exporters.size() > 0) { + exporters.forEach(x -> x.shutdown()); + } + + if (producer != null) { + producer.close(); + } + + running = false; + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + log.info("Interrupted during shutdown, exiting uncleanly"); + } + } + + @Override + public boolean currentlyRunning() { + return running; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java new file mode 100644 index 0000000..8a0322f --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.exporter; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.periodic.notification.api.BindingSetExporter; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.BindingSetRecordExportException; +import org.openrdf.model.Literal; +import org.openrdf.query.BindingSet; + +import jline.internal.Preconditions; + +/** + * Object that exports {@link BindingSet}s to the Kafka topic indicated by + * the {@link BindingSetRecord}. + * + */ +public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runnable { + + private static final Logger log = Logger.getLogger(BindingSetExporter.class); + private KafkaProducer<String, BindingSet> producer; + private BlockingQueue<BindingSetRecord> bindingSets; + private AtomicBoolean closed = new AtomicBoolean(false); + private int threadNumber; + + public KafkaPeriodicBindingSetExporter(KafkaProducer<String, BindingSet> producer, int threadNumber, + BlockingQueue<BindingSetRecord> bindingSets) { + Preconditions.checkNotNull(producer); + Preconditions.checkNotNull(bindingSets); + this.threadNumber = threadNumber; + this.producer = producer; + this.bindingSets = bindingSets; + } + + /** + * Exports BindingSets to Kafka. The BindingSet and topic are extracted from + * the indicated BindingSetRecord and the BindingSet is then exported to the topic. + */ + @Override + public void exportNotification(BindingSetRecord record) throws BindingSetRecordExportException { + String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID; + BindingSet bindingSet = record.getBindingSet(); + String topic = record.getTopic(); + long binId = ((Literal) bindingSet.getValue(bindingName)).longValue(); + final Future<RecordMetadata> future = producer + .send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet)); + try { + //wait for confirmation that results have been received + future.get(5, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new BindingSetRecordExportException(e.getMessage()); + } + } + + @Override + public void run() { + try { + while (!closed.get()) { + exportNotification(bindingSets.take()); + } + } catch (InterruptedException | BindingSetRecordExportException e) { + log.trace("Thread " + threadNumber + " is unable to process message."); + } + } + + + public void shutdown() { + closed.set(true); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java new file mode 100644 index 0000000..a9a5ad1 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java @@ -0,0 +1,114 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.periodic.notification.processor; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; + +import com.google.common.base.Preconditions; + +/** + * Executor service that runs {@link TimestampedNotificationProcessor}s with basic + * functionality for starting, stopping, and determining whether notification processors are + * being executed. + * + */ +public class NotificationProcessorExecutor implements LifeCycle { + + private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); + private BlockingQueue<TimestampedNotification> notifications; // notifications + private BlockingQueue<NodeBin> bins; // entries to delete from Fluo + private BlockingQueue<BindingSetRecord> bindingSets; // query results to + // export + private PeriodicQueryResultStorage periodicStorage; + private List<TimestampedNotificationProcessor> processors; + private int numberThreads; + private ExecutorService executor; + private boolean running = false; + + /** + * Creates NotificationProcessorExecutor. + * @param periodicStorage - storage layer that periodic results are read from + * @param notifications - notifications are pulled from this queue, and the timestamp indicates which bin of results to query for + * @param bins - after notifications are processed, they are added to the bin to be deleted + * @param bindingSets - results read from the storage layer to be exported + * @param numberThreads - number of threads used for processing + */ + public NotificationProcessorExecutor(PeriodicQueryResultStorage periodicStorage, BlockingQueue<TimestampedNotification> notifications, + BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, int numberThreads) { + this.notifications = Preconditions.checkNotNull(notifications); + this.bins = Preconditions.checkNotNull(bins); + this.bindingSets = Preconditions.checkNotNull(bindingSets); + this.periodicStorage = periodicStorage; + this.numberThreads = numberThreads; + processors = new ArrayList<>(); + } + + @Override + public void start() { + if (!running) { + executor = Executors.newFixedThreadPool(numberThreads); + for (int threadNumber = 0; threadNumber < numberThreads; threadNumber++) { + log.info("Creating exporter:" + threadNumber); + TimestampedNotificationProcessor processor = TimestampedNotificationProcessor.builder().setBindingSets(bindingSets) + .setBins(bins).setPeriodicStorage(periodicStorage).setNotifications(notifications).setThreadNumber(threadNumber) + .build(); + processors.add(processor); + executor.submit(processor); + } + running = true; + } + } + + @Override + public void stop() { + if (processors != null && processors.size() > 0) { + processors.forEach(x -> x.shutdown()); + } + if (executor != null) { + executor.shutdown(); + } + running = false; + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + log.info("Interrupted during shutdown, exiting uncleanly"); + } + } + + @Override + public boolean currentlyRunning() { + return running; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java new file mode 100644 index 0000000..8b65683 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.processor; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationProcessor; +import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Preconditions; + +/** + * Implementation of {@link NotificationProcessor} that uses the id indicated by + * the {@link TimestampedNotification} to obtain results from the + * {@link PeriodicQueryResultStorage} layer containing the results of the + * Periodic Query. The TimestampedNotificationProcessor then parses the results + * and adds them to work queues to be processed by the {@link BinPruner} and the + * {@link KafkaPeriodicBindingSetExporter}. + * + */ +public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable { + + private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); + private PeriodicQueryResultStorage periodicStorage; + private BlockingQueue<TimestampedNotification> notifications; // notifications + // to process + private BlockingQueue<NodeBin> bins; // entries to delete from Fluo + private BlockingQueue<BindingSetRecord> bindingSets; // query results to export + private AtomicBoolean closed = new AtomicBoolean(false); + private int threadNumber; + + + public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage, + BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, + int threadNumber) { + this.notifications = Preconditions.checkNotNull(notifications); + this.bins = Preconditions.checkNotNull(bins); + this.bindingSets = Preconditions.checkNotNull(bindingSets); + this.periodicStorage = periodicStorage; + this.threadNumber = threadNumber; + } + + /** + * Processes the TimestampNotifications by scanning the PCJ tables for + * entries in the bin corresponding to + * {@link TimestampedNotification#getTimestamp()} and adding them to the + * export BlockingQueue. The TimestampNotification is then used to form a + * {@link NodeBin} that is passed to the BinPruner BlockingQueue so that the + * bins can be deleted from Fluo and Accumulo. + */ + @Override + public void processNotification(TimestampedNotification notification) { + + String id = notification.getId(); + long ts = notification.getTimestamp().getTime(); + long period = notification.getPeriod(); + long bin = getBinFromTimestamp(ts, period); + NodeBin nodeBin = new NodeBin(id, bin); + + try (CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(bin));) { + + while(iter.hasNext()) { + bindingSets.add(new BindingSetRecord(iter.next(), id)); + } + // add NodeBin to BinPruner queue so that bin can be deleted from + // Fluo and Accumulo + bins.add(nodeBin); + } catch (Exception e) { + log.debug("Encountered error: " + e.getMessage() + " while accessing periodic results for bin: " + bin + " for query: " + id); + } + } + + /** + * Computes left bin end point containing event time ts + * + * @param ts - event time + * @param start - time that periodic event began + * @param period - length of period + * @return left bin end point containing event time ts + */ + private long getBinFromTimestamp(long ts, long period) { + Preconditions.checkArgument(period > 0); + return (ts / period) * period; + } + + @Override + public void run() { + try { + while(!closed.get()) { + processNotification(notifications.take()); + } + } catch (Exception e) { + log.trace("Thread_" + threadNumber + " is unable to process next notification."); + throw new RuntimeException(e); + } + + } + + public void shutdown() { + closed.set(true); + } + + public static Builder builder() { + return new Builder(); + } + + + + public static class Builder { + + private PeriodicQueryResultStorage periodicStorage; + private BlockingQueue<TimestampedNotification> notifications; // notifications to process + private BlockingQueue<NodeBin> bins; // entries to delete from Fluo + private BlockingQueue<BindingSetRecord> bindingSets; // query results to export + + private int threadNumber; + + /** + * Set notification queue + * @param notifications - work queue containing notifications to be processed + * @return this Builder for chaining method calls + */ + public Builder setNotifications(BlockingQueue<TimestampedNotification> notifications) { + this.notifications = notifications; + return this; + } + + /** + * Set nodeBin queue + * @param bins - work queue containing NodeBins to be pruned + * @return this Builder for chaining method calls + */ + public Builder setBins(BlockingQueue<NodeBin> bins) { + this.bins = bins; + return this; + } + + /** + * Set BindingSet queue + * @param bindingSets - work queue containing BindingSets to be exported + * @return this Builder for chaining method calls + */ + public Builder setBindingSets(BlockingQueue<BindingSetRecord> bindingSets) { + this.bindingSets = bindingSets; + return this; + } + + /** + * Sets the number of threads used by this processor + * @param threadNumber - number of threads used by this processor + * @return - number of threads used by this processor + */ + public Builder setThreadNumber(int threadNumber) { + this.threadNumber = threadNumber; + return this; + } + + /** + * Set the PeriodicStorage layer + * @param periodicStorage - periodic storage layer that periodic results are read from + * @return - this Builder for chaining method calls + */ + public Builder setPeriodicStorage(PeriodicQueryResultStorage periodicStorage) { + this.periodicStorage = periodicStorage; + return this; + } + + /** + * Builds a TimestampedNotificationProcessor + * @return - TimestampedNotificationProcessor built from arguments passed to this Builder + */ + public TimestampedNotificationProcessor build() { + return new TimestampedNotificationProcessor(periodicStorage, notifications, bins, bindingSets, threadNumber); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java new file mode 100644 index 0000000..4dac64c --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import jline.internal.Preconditions; + +/** + * Deletes BindingSets from time bins in the indicated PCJ table + */ +public class AccumuloBinPruner implements BinPruner { + + private static final Logger log = Logger.getLogger(AccumuloBinPruner.class); + private PeriodicQueryResultStorage periodicStorage; + + public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) { + Preconditions.checkNotNull(periodicStorage); + this.periodicStorage = periodicStorage; + } + + /** + * This method deletes all BindingSets in the indicated bin from the PCJ + * table indicated by the id. It is assumed that all BindingSet entries for + * the corresponding bin are written to the PCJ table so that the bin Id + * occurs first. + * + * @param id + * - pcj table id + * @param bin + * - temporal bin the BindingSets are contained in + */ + @Override + public void pruneBindingSetBin(NodeBin nodeBin) { + Preconditions.checkNotNull(nodeBin); + String id = nodeBin.getNodeId(); + long bin = nodeBin.getBin(); + try { + periodicStorage.deletePeriodicQueryResults(id, bin); + } catch (PeriodicQueryStorageException e) { + log.trace("Unable to delete results from Peroidic Table: " + id + " for bin: " + bin); + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java new file mode 100644 index 0000000..bee9c02 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import com.google.common.base.Optional; + +/** + * Deletes {@link BindingSet}s from the indicated Fluo table. + */ +public class FluoBinPruner implements BinPruner { + + private static final Logger log = Logger.getLogger(FluoBinPruner.class); + private FluoClient client; + + public FluoBinPruner(FluoClient client) { + this.client = client; + } + + /** + * This method deletes BindingSets in the specified bin from the BindingSet + * Column of the indicated Fluo nodeId + * + * @param id + * - Fluo nodeId + * @param bin + * - bin id + */ + @Override + public void pruneBindingSetBin(NodeBin nodeBin) { + String id = nodeBin.getNodeId(); + long bin = nodeBin.getBin(); + try (Transaction tx = client.newTransaction()) { + Optional<NodeType> type = NodeType.fromNodeId(id); + if (!type.isPresent()) { + log.trace("Unable to determine NodeType from id: " + id); + throw new RuntimeException(); + } + Column batchInfoColumn = type.get().getResultColumn(); + String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin; + SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn) + .setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build(); + BatchInformationDAO.addBatch(tx, id, batchInfo); + tx.commit(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java new file mode 100644 index 0000000..516690e --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import jline.internal.Preconditions; + +/** + * Implementation of {@link BinPruner} that deletes old, already processed + * Periodic Query results from Fluo and the PCJ table to which the Fluo results + * are exported. + * + */ +public class PeriodicQueryPruner implements BinPruner, Runnable { + + private static final Logger log = Logger.getLogger(PeriodicQueryPruner.class); + private FluoClient client; + private AccumuloBinPruner accPruner; + private FluoBinPruner fluoPruner; + private BlockingQueue<NodeBin> bins; + private AtomicBoolean closed = new AtomicBoolean(false); + private int threadNumber; + + public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) { + this.fluoPruner = Preconditions.checkNotNull(fluoPruner); + this.accPruner = Preconditions.checkNotNull(accPruner); + this.client = Preconditions.checkNotNull(client); + this.bins = Preconditions.checkNotNull(bins); + this.threadNumber = threadNumber; + } + + @Override + public void run() { + try { + while (!closed.get()) { + pruneBindingSetBin(bins.take()); + } + } catch (InterruptedException e) { + log.trace("Thread " + threadNumber + " is unable to prune the next message."); + throw new RuntimeException(e); + } + } + + /** + * Prunes BindingSet bins from the Rya Fluo Application in addition to the BindingSet + * bins created in the PCJ tables associated with the give query id. + * @param id - QueryResult Id for the Rya Fluo application + * @param bin - bin id for bins to be deleted + */ + @Override + public void pruneBindingSetBin(NodeBin nodeBin) { + String pcjId = nodeBin.getNodeId(); + long bin = nodeBin.getBin(); + try(Snapshot sx = client.newSnapshot()) { + String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); + Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId); + accPruner.pruneBindingSetBin(nodeBin); + for(String fluoId: fluoIds) { + fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin)); + } + } catch (Exception e) { + log.trace("Could not successfully initialize PeriodicQueryBinPruner."); + } + } + + + public void shutdown() { + closed.set(true); + } + + private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) { + Set<String> ids = new HashSet<>(); + PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids); + return ids; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java new file mode 100644 index 0000000..1c11f96 --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; + +import com.google.common.base.Preconditions; + +/** + * Executor service that runs {@link PeriodicQueryPruner}s with added functionality + * for starting, stopping, and determining if the query pruners are running. + */ +public class PeriodicQueryPrunerExecutor implements LifeCycle { + + private static final Logger log = Logger.getLogger(PeriodicQueryPrunerExecutor.class); + private FluoClient client; + private int numThreads; + private ExecutorService executor; + private BlockingQueue<NodeBin> bins; + private PeriodicQueryResultStorage periodicStorage; + private List<PeriodicQueryPruner> pruners; + private boolean running = false; + + public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage periodicStorage, FluoClient client, int numThreads, + BlockingQueue<NodeBin> bins) { + Preconditions.checkArgument(numThreads > 0); + this.periodicStorage = periodicStorage; + this.numThreads = numThreads; + executor = Executors.newFixedThreadPool(numThreads); + this.bins = bins; + this.client = client; + this.pruners = new ArrayList<>(); + } + + @Override + public void start() { + if (!running) { + AccumuloBinPruner accPruner = new AccumuloBinPruner(periodicStorage); + FluoBinPruner fluoPruner = new FluoBinPruner(client); + + for (int threadNumber = 0; threadNumber < numThreads; threadNumber++) { + PeriodicQueryPruner pruner = new PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber); + pruners.add(pruner); + executor.submit(pruner); + } + running = true; + } + } + + @Override + public void stop() { + if (pruners != null && pruners.size() > 0) { + pruners.forEach(x -> x.shutdown()); + } + if(client != null) { + client.close(); + } + if (executor != null) { + executor.shutdown(); + running = false; + } + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + log.info("Interrupted during shutdown, exiting uncleanly"); + } + } + + @Override + public boolean currentlyRunning() { + return running; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java new file mode 100644 index 0000000..69bd39c --- /dev/null +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.recovery; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; + +/** + * This class is used by the {@link PeriodicNotificationCoordinatorExecutor} + * to add all existing {@link PeriodicNotification}s stored in Fluo when it is + * initialized. This enables the the {@link PeriodicServiceApplication} to be + * recovered from failure by restoring it original state. + * + */ +public class PeriodicNotificationProvider { + + private FluoQueryMetadataDAO dao; + + public PeriodicNotificationProvider() { + this.dao = new FluoQueryMetadataDAO(); + } + + /** + * Retrieve all of the information about Periodic Query results already registered + * with Fluo. This is returned in the form of {@link CommandNotification}s that + * can be registered with the {@link NotificationCoordinatorExecutor}. + * @param sx - snapshot for reading results from Fluo + * @return - collection of CommandNotifications that indicate Periodic Query information registered with system + */ + public Collection<CommandNotification> getNotifications(Snapshot sx) { + Set<PeriodicQueryMetadata> periodicMetadata = new HashSet<>(); + RowScanner scanner = sx.scanner().fetch(FluoQueryColumns.PERIODIC_QUERY_NODE_ID) + .over(Span.prefix(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX)).byRow().build(); + Iterator<ColumnScanner> colScannerIter = scanner.iterator(); + while (colScannerIter.hasNext()) { + ColumnScanner colScanner = colScannerIter.next(); + Iterator<ColumnValue> values = colScanner.iterator(); + while (values.hasNext()) { + PeriodicQueryMetadata metadata = dao.readPeriodicQueryMetadata(sx, values.next().getsValue()); + periodicMetadata.add(metadata); + } + } + return getCommandNotifications(sx, periodicMetadata); + } + + /** + * Registers all of Periodic Query information already contained within Fluo to the + * {@link NotificationCoordinatorExecutor}. + * @param coordinator - coordinator that periodic info will be registered with + * @param sx - snapshot for reading results from Fluo + */ + public void processRegisteredNotifications(NotificationCoordinatorExecutor coordinator, Snapshot sx) { + coordinator.start(); + Collection<CommandNotification> notifications = getNotifications(sx); + for(CommandNotification notification: notifications) { + coordinator.processNextCommandNotification(notification); + } + } + + private Collection<CommandNotification> getCommandNotifications(Snapshot sx, Collection<PeriodicQueryMetadata> metadata) { + Set<CommandNotification> notifications = new HashSet<>(); + int i = 1; + for(PeriodicQueryMetadata meta:metadata) { + //offset initial wait to avoid overloading system + PeriodicNotification periodic = new PeriodicNotification(getQueryId(meta.getNodeId(), sx), meta.getPeriod(),TimeUnit.MILLISECONDS,i*5000); + notifications.add(new CommandNotification(Command.ADD, periodic)); + i++; + } + return notifications; + } + + private String getQueryId(String periodicNodeId, Snapshot sx) { + return getQueryIdFromPeriodicId(sx, periodicNodeId); + } + + private String getQueryIdFromPeriodicId(Snapshot sx, String nodeId) { + NodeType nodeType = NodeType.fromNodeId(nodeId).orNull(); + String id = null; + switch (nodeType) { + case FILTER: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.FILTER_PARENT_NODE_ID).toString()); + break; + case PERIODIC_QUERY: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString()); + break; + case QUERY: + id = FluoQueryUtils.convertFluoQueryIdToPcjId(nodeId); + break; + case AGGREGATION: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString()); + break; + case CONSTRUCT: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString()); + break; + case PROJECTION: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PROJECTION_PARENT_NODE_ID).toString()); + break; + default: + throw new IllegalArgumentException("Invalid node type"); + + } + return id; + } + +}