http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java deleted file mode 100644 index d69efe5..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationConfiguration.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.application; - -import java.util.Properties; - -import org.apache.rya.accumulo.AccumuloRdfConfiguration; - -import jline.internal.Preconditions; - -/** - * Configuration object for creating a {@link PeriodicNotificationApplication}. - */ -public class PeriodicNotificationApplicationConfiguration extends AccumuloRdfConfiguration { - - public static String FLUO_APP_NAME = "fluo.app.name"; - public static String FLUO_TABLE_NAME = "fluo.table.name"; - public static String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - public static String NOTIFICATION_TOPIC = "kafka.notification.topic"; - public static String NOTIFICATION_GROUP_ID = "kafka.notification.group.id"; - public static String NOTIFICATION_CLIENT_ID = "kafka.notification.client.id"; - public static String COORDINATOR_THREADS = "cep.coordinator.threads"; - public static String PRODUCER_THREADS = "cep.producer.threads"; - public static String EXPORTER_THREADS = "cep.exporter.threads"; - public static String PROCESSOR_THREADS = "cep.processor.threads"; - public static String PRUNER_THREADS = "cep.pruner.threads"; - - public PeriodicNotificationApplicationConfiguration() {} - - /** - * Creates an PeriodicNotificationApplicationConfiguration object from a Properties file. This method assumes - * that all values in the Properties file are Strings and that the Properties file uses the keys below. - * See rya.cep/cep.integration.tests/src/test/resources/properties/notification.properties for an example. - * <br> - * <ul> - * <li>"accumulo.auths" - String of Accumulo authorizations. Default is empty String. - * <li>"accumulo.instance" - Accumulo instance name (required) - * <li>"accumulo.user" - Accumulo user (required) - * <li>"accumulo.password" - Accumulo password (required) - * <li>"accumulo.rya.prefix" - Prefix for Accumulo backed Rya instance. Default is "rya_" - * <li>"accumulo.zookeepers" - Zookeepers for underlying Accumulo instance (required) - * <li>"fluo.app.name" - Name of Fluo Application (required) - * <li>"fluo.table.name" - Name of Fluo Table (required) - * <li>"kafka.bootstrap.servers" - Kafka Bootstrap servers for Producers and Consumers (required) - * <li>"kafka.notification.topic" - Topic to which new Periodic Notifications are published. Default is "notifications". - * <li>"kafka.notification.client.id" - Client Id for notification topic. Default is "consumer0" - * <li>"kafka.notification.group.id" - Group Id for notification topic. Default is "group0" - * <li>"cep.coordinator.threads" - Number of threads used by coordinator. Default is 1. - * <li>"cep.producer.threads" - Number of threads used by producer. Default is 1. - * <li>"cep.exporter.threads" - Number of threads used by exporter. Default is 1. - * <li>"cep.processor.threads" - Number of threads used by processor. Default is 1. - * <li>"cep.pruner.threads" - Number of threads used by pruner. Default is 1. - * </ul> - * <br> - * @param props - Properties file containing Accumulo specific configuration parameters - * @return AccumumuloRdfConfiguration with properties set - */ - public PeriodicNotificationApplicationConfiguration(Properties props) { - super(fromProperties(props)); - setFluoAppName(props.getProperty(FLUO_APP_NAME)); - setFluoTableName(props.getProperty(FLUO_TABLE_NAME)); - setBootStrapServers(props.getProperty(KAFKA_BOOTSTRAP_SERVERS)); - setNotificationClientId(props.getProperty(NOTIFICATION_CLIENT_ID, "consumer0")); - setNotificationTopic(props.getProperty(NOTIFICATION_TOPIC, "notifications")); - setNotificationGroupId(props.getProperty(NOTIFICATION_GROUP_ID, "group0")); - setProducerThreads(Integer.parseInt(props.getProperty(PRODUCER_THREADS, "1"))); - setProcessorThreads(Integer.parseInt(props.getProperty(PROCESSOR_THREADS, "1"))); - setExporterThreads(Integer.parseInt(props.getProperty(EXPORTER_THREADS, "1"))); - setPrunerThreads(Integer.parseInt(props.getProperty(PRUNER_THREADS, "1"))); - setCoordinatorThreads(Integer.parseInt(props.getProperty(COORDINATOR_THREADS, "1"))); - } - - /** - * Sets the name of the Fluo Application - * @param fluoAppName - */ - public void setFluoAppName(String fluoAppName) { - set(FLUO_APP_NAME, Preconditions.checkNotNull(fluoAppName)); - } - - /** - * Sets the name of the Fluo table - * @param fluoTableName - */ - public void setFluoTableName(String fluoTableName) { - set(FLUO_TABLE_NAME, Preconditions.checkNotNull(fluoTableName)); - } - - /** - * Sets the Kafka bootstrap servers - * @param bootStrapServers - */ - public void setBootStrapServers(String bootStrapServers) { - set(KAFKA_BOOTSTRAP_SERVERS, Preconditions.checkNotNull(bootStrapServers)); - } - - /** - * Sets the Kafka topic name for new notification requests - * @param notificationTopic - */ - public void setNotificationTopic(String notificationTopic) { - set(NOTIFICATION_TOPIC, Preconditions.checkNotNull(notificationTopic)); - } - - /** - * Sets the GroupId for new notification request topic - * @param notificationGroupId - */ - public void setNotificationGroupId(String notificationGroupId) { - set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationGroupId)); - } - - /** - * Sets the ClientId for the Kafka notification topic - * @param notificationClientId - */ - public void setNotificationClientId(String notificationClientId) { - set(NOTIFICATION_GROUP_ID, Preconditions.checkNotNull(notificationClientId)); - } - - /** - * Sets the number of threads for the coordinator - * @param threads - */ - public void setCoordinatorThreads(int threads) { - setInt(COORDINATOR_THREADS, threads); - } - - /** - * Sets the number of threads for the exporter - * @param threads - */ - public void setExporterThreads(int threads) { - setInt(EXPORTER_THREADS, threads); - } - - /** - * Sets the number of threads for the producer for reading new periodic notifications - * @param threads - */ - public void setProducerThreads(int threads) { - setInt(PRODUCER_THREADS, threads); - } - - /** - * Sets the number of threads for the bin pruner - * @param threads - */ - public void setPrunerThreads(int threads) { - setInt(PRUNER_THREADS, threads); - } - - /** - * Sets the number of threads for the Notification processor - * @param threads - */ - public void setProcessorThreads(int threads) { - setInt(PROCESSOR_THREADS, threads); - } - - /** - * @return name of the Fluo application - */ - public String getFluoAppName() { - return get(FLUO_APP_NAME); - } - - /** - * @return name of the Fluo table - */ - public String getFluoTableName() { - return get(FLUO_TABLE_NAME); - } - - /** - * @return Kafka bootstrap servers - */ - public String getBootStrapServers() { - return get(KAFKA_BOOTSTRAP_SERVERS); - } - - /** - * @return notification topic - */ - public String getNotificationTopic() { - return get(NOTIFICATION_TOPIC, "notifications"); - } - - /** - * @return Kafka GroupId for the notificaton topic - */ - public String getNotificationGroupId() { - return get(NOTIFICATION_GROUP_ID, "group0"); - } - - /** - * @return Kafka ClientId for the notification topic - */ - public String getNotificationClientId() { - return get(NOTIFICATION_CLIENT_ID, "consumer0"); - } - - /** - * @return the number of threads for the coordinator - */ - public int getCoordinatorThreads() { - return getInt(COORDINATOR_THREADS, 1); - } - - /** - * @return the number of threads for the exporter - */ - public int getExporterThreads() { - return getInt(EXPORTER_THREADS, 1); - } - - /** - * @return the number of threads for the notification producer - */ - public int getProducerThreads() { - return getInt(PRODUCER_THREADS, 1); - } - - /** - * @return the number of threads for the bin pruner - */ - public int getPrunerThreads() { - return getInt(PRUNER_THREADS, 1); - } - - /** - * @return number of threads for the processor - */ - public int getProcessorThreads() { - return getInt(PROCESSOR_THREADS, 1); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java deleted file mode 100644 index 771a4ab..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.application; - -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Snapshot; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; -import org.apache.rya.periodic.notification.notification.TimestampedNotification; -import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; -import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; -import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider; -import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider; -import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; -import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; -import org.openrdf.query.BindingSet; - -/** - * Factory for creating a {@link PeriodicNotificationApplication}. - */ -public class PeriodicNotificationApplicationFactory { - - /** - * Create a PeriodicNotificationApplication. - * @param props - Properties file that specifies the parameters needed to create the application - * @return PeriodicNotificationApplication to periodically poll Rya Fluo for new results - * @throws PeriodicApplicationException - */ - public static PeriodicNotificationApplication getPeriodicApplication(Properties props) throws PeriodicApplicationException { - PeriodicNotificationApplicationConfiguration conf = new PeriodicNotificationApplicationConfiguration(props); - Properties kafkaProps = getKafkaProperties(conf); - - BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); - BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>(); - BlockingQueue<BindingSetRecord> bindingSets = new LinkedBlockingQueue<>(); - - FluoClient fluo = null; - try { - PeriodicQueryResultStorage storage = getPeriodicQueryResultStorage(conf); - fluo = FluoClientFactory.getFluoClient(conf.getFluoAppName(), Optional.of(conf.getFluoTableName()), conf); - NotificationCoordinatorExecutor coordinator = getCoordinator(conf.getCoordinatorThreads(), notifications); - addRegisteredNotices(coordinator, fluo.newSnapshot()); - KafkaExporterExecutor exporter = getExporter(conf.getExporterThreads(), kafkaProps, bindingSets); - PeriodicQueryPrunerExecutor pruner = getPruner(storage, fluo, conf.getPrunerThreads(), bins); - NotificationProcessorExecutor processor = getProcessor(storage, notifications, bins, bindingSets, conf.getProcessorThreads()); - KafkaNotificationProvider provider = getProvider(conf.getProducerThreads(), conf.getNotificationTopic(), coordinator, kafkaProps); - return PeriodicNotificationApplication.builder().setCoordinator(coordinator).setProvider(provider).setExporter(exporter) - .setProcessor(processor).setPruner(pruner).build(); - } catch (AccumuloException | AccumuloSecurityException e) { - throw new PeriodicApplicationException(e.getMessage()); - } - } - - private static void addRegisteredNotices(NotificationCoordinatorExecutor coord, Snapshot sx) { - coord.start(); - PeriodicNotificationProvider provider = new PeriodicNotificationProvider(); - provider.processRegisteredNotifications(coord, sx); - } - - private static NotificationCoordinatorExecutor getCoordinator(int numThreads, BlockingQueue<TimestampedNotification> notifications) { - return new PeriodicNotificationCoordinatorExecutor(numThreads, notifications); - } - - private static KafkaExporterExecutor getExporter(int numThreads, Properties props, BlockingQueue<BindingSetRecord> bindingSets) { - KafkaProducer<String, BindingSet> producer = new KafkaProducer<>(props, new StringSerializer(), new BindingSetSerDe()); - return new KafkaExporterExecutor(producer, numThreads, bindingSets); - } - - private static PeriodicQueryPrunerExecutor getPruner(PeriodicQueryResultStorage storage, FluoClient fluo, int numThreads, - BlockingQueue<NodeBin> bins) { - return new PeriodicQueryPrunerExecutor(storage, fluo, numThreads, bins); - } - - private static NotificationProcessorExecutor getProcessor(PeriodicQueryResultStorage periodicStorage, - BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, - int numThreads) { - return new NotificationProcessorExecutor(periodicStorage, notifications, bins, bindingSets, numThreads); - } - - private static KafkaNotificationProvider getProvider(int numThreads, String topic, NotificationCoordinatorExecutor coord, - Properties props) { - return new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, - numThreads); - } - - private static PeriodicQueryResultStorage getPeriodicQueryResultStorage(PeriodicNotificationApplicationConfiguration conf) - throws AccumuloException, AccumuloSecurityException { - Instance instance = new ZooKeeperInstance(conf.getAccumuloInstance(), conf.getAccumuloZookeepers()); - Connector conn = instance.getConnector(conf.getAccumuloUser(), new PasswordToken(conf.getAccumuloPassword())); - String ryaInstance = conf.getTablePrefix(); - return new AccumuloPeriodicQueryResultStorage(conn, ryaInstance); - } - - private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { - Properties kafkaProps = new Properties(); - kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers()); - kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId()); - kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); - kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return kafkaProps; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java deleted file mode 100644 index 0486244..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/coordinator/PeriodicNotificationCoordinatorExecutor.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.coordinator; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.api.NotificationProcessor; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; -import org.apache.rya.periodic.notification.notification.TimestampedNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification.Command; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -/** - * Implementation of {@link NotificationCoordinatorExecutor} that generates regular notifications - * as indicated by {@link PeriodicNotification}s that are registered with this Object. When notifications - * are generated they are placed on a work queue to be processed by the {@link NotificationProcessor}. - * - */ -public class PeriodicNotificationCoordinatorExecutor implements NotificationCoordinatorExecutor { - - private static final Logger LOG = LoggerFactory.getLogger(PeriodicNotificationCoordinatorExecutor.class); - private int numThreads; - private ScheduledExecutorService producerThreadPool; - private Map<String, ScheduledFuture<?>> serviceMap = new HashMap<>(); - private BlockingQueue<TimestampedNotification> notifications; - private final ReentrantLock lock = new ReentrantLock(true); - private boolean running = false; - - public PeriodicNotificationCoordinatorExecutor(int numThreads, BlockingQueue<TimestampedNotification> notifications) { - this.numThreads = numThreads; - this.notifications = notifications; - } - - @Override - public void processNextCommandNotification(CommandNotification notification) { - lock.lock(); - try { - processNotification(notification); - } finally { - lock.unlock(); - } - } - - @Override - public void start() { - if (!running) { - producerThreadPool = Executors.newScheduledThreadPool(numThreads); - running = true; - } - } - - @Override - public void stop() { - - if (producerThreadPool != null) { - producerThreadPool.shutdown(); - } - - running = false; - - try { - if (!producerThreadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) { - producerThreadPool.shutdownNow(); - } - } catch (Exception e) { - LOG.info("Service Executor Shutdown has been called. Terminating NotificationRunnable"); - } - } - - private void processNotification(CommandNotification notification) { - Command command = notification.getCommand(); - Notification periodic = notification.getNotification(); - switch (command) { - case ADD: - addNotification(periodic); - break; - case DELETE: - deleteNotification(periodic); - break; - } - } - - private void addNotification(Notification notification) { - Preconditions.checkArgument(notification instanceof PeriodicNotification); - PeriodicNotification notify = (PeriodicNotification) notification; - if (!serviceMap.containsKey(notification.getId())) { - ScheduledFuture<?> future = producerThreadPool.scheduleAtFixedRate(new NotificationProducer(notify), notify.getInitialDelay(), - notify.getPeriod(), notify.getTimeUnit()); - serviceMap.put(notify.getId(), future); - } - } - - private boolean deleteNotification(Notification notification) { - if (serviceMap.containsKey(notification.getId())) { - ScheduledFuture<?> future = serviceMap.remove(notification.getId()); - future.cancel(true); - return true; - } - return false; - } - - /** - * Scheduled Task that places a {@link PeriodicNotification} - * in the work queue at regular intervals. - * - */ - class NotificationProducer implements Runnable { - - private PeriodicNotification notification; - - public NotificationProducer(PeriodicNotification notification) { - this.notification = notification; - } - - public void run() { - try { - notifications.put(new TimestampedNotification(notification)); - } catch (InterruptedException e) { - LOG.info("Unable to add notification. Process interrupted. "); - throw new RuntimeException(e); - } - } - - } - - @Override - public boolean currentlyRunning() { - return running; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java deleted file mode 100644 index c2e5ebf..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.exporter; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.log4j.Logger; -import org.apache.rya.periodic.notification.api.BindingSetExporter; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.api.LifeCycle; -import org.openrdf.query.BindingSet; - -import jline.internal.Preconditions; - -/** - * Executor service that runs {@link KafkaPeriodicBindingSetExporter}s. - * - */ -public class KafkaExporterExecutor implements LifeCycle { - - private static final Logger log = Logger.getLogger(BindingSetExporter.class); - private KafkaProducer<String, BindingSet> producer; - private BlockingQueue<BindingSetRecord> bindingSets; - private ExecutorService executor; - private List<KafkaPeriodicBindingSetExporter> exporters; - private int num_Threads; - private boolean running = false; - - /** - * Creates a KafkaExporterExecutor for exporting periodic query results to Kafka. - * @param producer for publishing results to Kafka - * @param num_Threads number of threads used to publish results - * @param bindingSets - work queue containing {@link BindingSet}s to be published - */ - public KafkaExporterExecutor(KafkaProducer<String, BindingSet> producer, int num_Threads, BlockingQueue<BindingSetRecord> bindingSets) { - Preconditions.checkNotNull(producer); - Preconditions.checkNotNull(bindingSets); - this.producer = producer; - this.bindingSets = bindingSets; - this.num_Threads = num_Threads; - this.exporters = new ArrayList<>(); - } - - @Override - public void start() { - if (!running) { - executor = Executors.newFixedThreadPool(num_Threads); - - for (int threadNumber = 0; threadNumber < num_Threads; threadNumber++) { - log.info("Creating exporter:" + threadNumber); - KafkaPeriodicBindingSetExporter exporter = new KafkaPeriodicBindingSetExporter(producer, threadNumber, bindingSets); - exporters.add(exporter); - executor.submit(exporter); - } - running = true; - } - } - - @Override - public void stop() { - if (executor != null) { - executor.shutdown(); - } - - if (exporters != null && exporters.size() > 0) { - exporters.forEach(x -> x.shutdown()); - } - - if (producer != null) { - producer.close(); - } - - running = false; - try { - if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { - log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); - executor.shutdownNow(); - } - } catch (InterruptedException e) { - log.info("Interrupted during shutdown, exiting uncleanly"); - } - } - - @Override - public boolean currentlyRunning() { - return running; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java deleted file mode 100644 index 8a0322f..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.exporter; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.periodic.notification.api.BindingSetExporter; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.api.BindingSetRecordExportException; -import org.openrdf.model.Literal; -import org.openrdf.query.BindingSet; - -import jline.internal.Preconditions; - -/** - * Object that exports {@link BindingSet}s to the Kafka topic indicated by - * the {@link BindingSetRecord}. - * - */ -public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runnable { - - private static final Logger log = Logger.getLogger(BindingSetExporter.class); - private KafkaProducer<String, BindingSet> producer; - private BlockingQueue<BindingSetRecord> bindingSets; - private AtomicBoolean closed = new AtomicBoolean(false); - private int threadNumber; - - public KafkaPeriodicBindingSetExporter(KafkaProducer<String, BindingSet> producer, int threadNumber, - BlockingQueue<BindingSetRecord> bindingSets) { - Preconditions.checkNotNull(producer); - Preconditions.checkNotNull(bindingSets); - this.threadNumber = threadNumber; - this.producer = producer; - this.bindingSets = bindingSets; - } - - /** - * Exports BindingSets to Kafka. The BindingSet and topic are extracted from - * the indicated BindingSetRecord and the BindingSet is then exported to the topic. - */ - @Override - public void exportNotification(BindingSetRecord record) throws BindingSetRecordExportException { - String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID; - BindingSet bindingSet = record.getBindingSet(); - String topic = record.getTopic(); - long binId = ((Literal) bindingSet.getValue(bindingName)).longValue(); - final Future<RecordMetadata> future = producer - .send(new ProducerRecord<String, BindingSet>(topic, Long.toString(binId), bindingSet)); - try { - //wait for confirmation that results have been received - future.get(5, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new BindingSetRecordExportException(e.getMessage()); - } - } - - @Override - public void run() { - try { - while (!closed.get()) { - exportNotification(bindingSets.take()); - } - } catch (InterruptedException | BindingSetRecordExportException e) { - log.trace("Thread " + threadNumber + " is unable to process message."); - } - } - - - public void shutdown() { - closed.set(true); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java deleted file mode 100644 index a9a5ad1..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java +++ /dev/null @@ -1,114 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */package org.apache.rya.periodic.notification.processor; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.api.LifeCycle; -import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.notification.TimestampedNotification; - -import com.google.common.base.Preconditions; - -/** - * Executor service that runs {@link TimestampedNotificationProcessor}s with basic - * functionality for starting, stopping, and determining whether notification processors are - * being executed. - * - */ -public class NotificationProcessorExecutor implements LifeCycle { - - private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); - private BlockingQueue<TimestampedNotification> notifications; // notifications - private BlockingQueue<NodeBin> bins; // entries to delete from Fluo - private BlockingQueue<BindingSetRecord> bindingSets; // query results to - // export - private PeriodicQueryResultStorage periodicStorage; - private List<TimestampedNotificationProcessor> processors; - private int numberThreads; - private ExecutorService executor; - private boolean running = false; - - /** - * Creates NotificationProcessorExecutor. - * @param periodicStorage - storage layer that periodic results are read from - * @param notifications - notifications are pulled from this queue, and the timestamp indicates which bin of results to query for - * @param bins - after notifications are processed, they are added to the bin to be deleted - * @param bindingSets - results read from the storage layer to be exported - * @param numberThreads - number of threads used for processing - */ - public NotificationProcessorExecutor(PeriodicQueryResultStorage periodicStorage, BlockingQueue<TimestampedNotification> notifications, - BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, int numberThreads) { - this.notifications = Preconditions.checkNotNull(notifications); - this.bins = Preconditions.checkNotNull(bins); - this.bindingSets = Preconditions.checkNotNull(bindingSets); - this.periodicStorage = periodicStorage; - this.numberThreads = numberThreads; - processors = new ArrayList<>(); - } - - @Override - public void start() { - if (!running) { - executor = Executors.newFixedThreadPool(numberThreads); - for (int threadNumber = 0; threadNumber < numberThreads; threadNumber++) { - log.info("Creating exporter:" + threadNumber); - TimestampedNotificationProcessor processor = TimestampedNotificationProcessor.builder().setBindingSets(bindingSets) - .setBins(bins).setPeriodicStorage(periodicStorage).setNotifications(notifications).setThreadNumber(threadNumber) - .build(); - processors.add(processor); - executor.submit(processor); - } - running = true; - } - } - - @Override - public void stop() { - if (processors != null && processors.size() > 0) { - processors.forEach(x -> x.shutdown()); - } - if (executor != null) { - executor.shutdown(); - } - running = false; - try { - if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { - log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); - executor.shutdownNow(); - } - } catch (InterruptedException e) { - log.info("Interrupted during shutdown, exiting uncleanly"); - } - } - - @Override - public boolean currentlyRunning() { - return running; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java deleted file mode 100644 index 8b65683..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.processor; - -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; -import org.apache.rya.periodic.notification.api.BinPruner; -import org.apache.rya.periodic.notification.api.BindingSetRecord; -import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.api.NotificationProcessor; -import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter; -import org.apache.rya.periodic.notification.notification.TimestampedNotification; -import org.openrdf.query.BindingSet; - -import com.google.common.base.Preconditions; - -/** - * Implementation of {@link NotificationProcessor} that uses the id indicated by - * the {@link TimestampedNotification} to obtain results from the - * {@link PeriodicQueryResultStorage} layer containing the results of the - * Periodic Query. The TimestampedNotificationProcessor then parses the results - * and adds them to work queues to be processed by the {@link BinPruner} and the - * {@link KafkaPeriodicBindingSetExporter}. - * - */ -public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable { - - private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); - private PeriodicQueryResultStorage periodicStorage; - private BlockingQueue<TimestampedNotification> notifications; // notifications - // to process - private BlockingQueue<NodeBin> bins; // entries to delete from Fluo - private BlockingQueue<BindingSetRecord> bindingSets; // query results to export - private AtomicBoolean closed = new AtomicBoolean(false); - private int threadNumber; - - - public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage, - BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, - int threadNumber) { - this.notifications = Preconditions.checkNotNull(notifications); - this.bins = Preconditions.checkNotNull(bins); - this.bindingSets = Preconditions.checkNotNull(bindingSets); - this.periodicStorage = periodicStorage; - this.threadNumber = threadNumber; - } - - /** - * Processes the TimestampNotifications by scanning the PCJ tables for - * entries in the bin corresponding to - * {@link TimestampedNotification#getTimestamp()} and adding them to the - * export BlockingQueue. The TimestampNotification is then used to form a - * {@link NodeBin} that is passed to the BinPruner BlockingQueue so that the - * bins can be deleted from Fluo and Accumulo. - */ - @Override - public void processNotification(TimestampedNotification notification) { - - String id = notification.getId(); - long ts = notification.getTimestamp().getTime(); - long period = notification.getPeriod(); - long bin = getBinFromTimestamp(ts, period); - NodeBin nodeBin = new NodeBin(id, bin); - - try (CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(bin));) { - - while(iter.hasNext()) { - bindingSets.add(new BindingSetRecord(iter.next(), id)); - } - // add NodeBin to BinPruner queue so that bin can be deleted from - // Fluo and Accumulo - bins.add(nodeBin); - } catch (Exception e) { - log.debug("Encountered error: " + e.getMessage() + " while accessing periodic results for bin: " + bin + " for query: " + id); - } - } - - /** - * Computes left bin end point containing event time ts - * - * @param ts - event time - * @param start - time that periodic event began - * @param period - length of period - * @return left bin end point containing event time ts - */ - private long getBinFromTimestamp(long ts, long period) { - Preconditions.checkArgument(period > 0); - return (ts / period) * period; - } - - @Override - public void run() { - try { - while(!closed.get()) { - processNotification(notifications.take()); - } - } catch (Exception e) { - log.trace("Thread_" + threadNumber + " is unable to process next notification."); - throw new RuntimeException(e); - } - - } - - public void shutdown() { - closed.set(true); - } - - public static Builder builder() { - return new Builder(); - } - - - - public static class Builder { - - private PeriodicQueryResultStorage periodicStorage; - private BlockingQueue<TimestampedNotification> notifications; // notifications to process - private BlockingQueue<NodeBin> bins; // entries to delete from Fluo - private BlockingQueue<BindingSetRecord> bindingSets; // query results to export - - private int threadNumber; - - /** - * Set notification queue - * @param notifications - work queue containing notifications to be processed - * @return this Builder for chaining method calls - */ - public Builder setNotifications(BlockingQueue<TimestampedNotification> notifications) { - this.notifications = notifications; - return this; - } - - /** - * Set nodeBin queue - * @param bins - work queue containing NodeBins to be pruned - * @return this Builder for chaining method calls - */ - public Builder setBins(BlockingQueue<NodeBin> bins) { - this.bins = bins; - return this; - } - - /** - * Set BindingSet queue - * @param bindingSets - work queue containing BindingSets to be exported - * @return this Builder for chaining method calls - */ - public Builder setBindingSets(BlockingQueue<BindingSetRecord> bindingSets) { - this.bindingSets = bindingSets; - return this; - } - - /** - * Sets the number of threads used by this processor - * @param threadNumber - number of threads used by this processor - * @return - number of threads used by this processor - */ - public Builder setThreadNumber(int threadNumber) { - this.threadNumber = threadNumber; - return this; - } - - /** - * Set the PeriodicStorage layer - * @param periodicStorage - periodic storage layer that periodic results are read from - * @return - this Builder for chaining method calls - */ - public Builder setPeriodicStorage(PeriodicQueryResultStorage periodicStorage) { - this.periodicStorage = periodicStorage; - return this; - } - - /** - * Builds a TimestampedNotificationProcessor - * @return - TimestampedNotificationProcessor built from arguments passed to this Builder - */ - public TimestampedNotificationProcessor build() { - return new TimestampedNotificationProcessor(periodicStorage, notifications, bins, bindingSets, threadNumber); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java deleted file mode 100644 index 4dac64c..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.pruner; - -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; -import org.apache.rya.periodic.notification.api.BinPruner; -import org.apache.rya.periodic.notification.api.NodeBin; - -import jline.internal.Preconditions; - -/** - * Deletes BindingSets from time bins in the indicated PCJ table - */ -public class AccumuloBinPruner implements BinPruner { - - private static final Logger log = Logger.getLogger(AccumuloBinPruner.class); - private PeriodicQueryResultStorage periodicStorage; - - public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) { - Preconditions.checkNotNull(periodicStorage); - this.periodicStorage = periodicStorage; - } - - /** - * This method deletes all BindingSets in the indicated bin from the PCJ - * table indicated by the id. It is assumed that all BindingSet entries for - * the corresponding bin are written to the PCJ table so that the bin Id - * occurs first. - * - * @param id - * - pcj table id - * @param bin - * - temporal bin the BindingSets are contained in - */ - @Override - public void pruneBindingSetBin(NodeBin nodeBin) { - Preconditions.checkNotNull(nodeBin); - String id = nodeBin.getNodeId(); - long bin = nodeBin.getBin(); - try { - periodicStorage.deletePeriodicQueryResults(id, bin); - } catch (PeriodicQueryStorageException e) { - log.trace("Unable to delete results from Peroidic Table: " + id + " for bin: " + bin); - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java deleted file mode 100644 index bee9c02..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.pruner; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.Span; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; -import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; -import org.apache.rya.periodic.notification.api.BinPruner; -import org.apache.rya.periodic.notification.api.NodeBin; - -import com.google.common.base.Optional; - -/** - * Deletes {@link BindingSet}s from the indicated Fluo table. - */ -public class FluoBinPruner implements BinPruner { - - private static final Logger log = Logger.getLogger(FluoBinPruner.class); - private FluoClient client; - - public FluoBinPruner(FluoClient client) { - this.client = client; - } - - /** - * This method deletes BindingSets in the specified bin from the BindingSet - * Column of the indicated Fluo nodeId - * - * @param id - * - Fluo nodeId - * @param bin - * - bin id - */ - @Override - public void pruneBindingSetBin(NodeBin nodeBin) { - String id = nodeBin.getNodeId(); - long bin = nodeBin.getBin(); - try (Transaction tx = client.newTransaction()) { - Optional<NodeType> type = NodeType.fromNodeId(id); - if (!type.isPresent()) { - log.trace("Unable to determine NodeType from id: " + id); - throw new RuntimeException(); - } - Column batchInfoColumn = type.get().getResultColumn(); - String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin; - SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn) - .setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build(); - BatchInformationDAO.addBatch(tx, id, batchInfo); - tx.commit(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java deleted file mode 100644 index 516690e..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.pruner; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.SnapshotBase; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; -import org.apache.rya.periodic.notification.api.BinPruner; -import org.apache.rya.periodic.notification.api.NodeBin; - -import jline.internal.Preconditions; - -/** - * Implementation of {@link BinPruner} that deletes old, already processed - * Periodic Query results from Fluo and the PCJ table to which the Fluo results - * are exported. - * - */ -public class PeriodicQueryPruner implements BinPruner, Runnable { - - private static final Logger log = Logger.getLogger(PeriodicQueryPruner.class); - private FluoClient client; - private AccumuloBinPruner accPruner; - private FluoBinPruner fluoPruner; - private BlockingQueue<NodeBin> bins; - private AtomicBoolean closed = new AtomicBoolean(false); - private int threadNumber; - - public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) { - this.fluoPruner = Preconditions.checkNotNull(fluoPruner); - this.accPruner = Preconditions.checkNotNull(accPruner); - this.client = Preconditions.checkNotNull(client); - this.bins = Preconditions.checkNotNull(bins); - this.threadNumber = threadNumber; - } - - @Override - public void run() { - try { - while (!closed.get()) { - pruneBindingSetBin(bins.take()); - } - } catch (InterruptedException e) { - log.trace("Thread " + threadNumber + " is unable to prune the next message."); - throw new RuntimeException(e); - } - } - - /** - * Prunes BindingSet bins from the Rya Fluo Application in addition to the BindingSet - * bins created in the PCJ tables associated with the give query id. - * @param id - QueryResult Id for the Rya Fluo application - * @param bin - bin id for bins to be deleted - */ - @Override - public void pruneBindingSetBin(NodeBin nodeBin) { - String pcjId = nodeBin.getNodeId(); - long bin = nodeBin.getBin(); - try(Snapshot sx = client.newSnapshot()) { - String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); - Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId); - accPruner.pruneBindingSetBin(nodeBin); - for(String fluoId: fluoIds) { - fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin)); - } - } catch (Exception e) { - log.trace("Could not successfully initialize PeriodicQueryBinPruner."); - } - } - - - public void shutdown() { - closed.set(true); - } - - private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) { - Set<String> ids = new HashSet<>(); - PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids); - return ids; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java deleted file mode 100644 index 1c11f96..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.pruner; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.periodic.notification.api.LifeCycle; -import org.apache.rya.periodic.notification.api.NodeBin; - -import com.google.common.base.Preconditions; - -/** - * Executor service that runs {@link PeriodicQueryPruner}s with added functionality - * for starting, stopping, and determining if the query pruners are running. - */ -public class PeriodicQueryPrunerExecutor implements LifeCycle { - - private static final Logger log = Logger.getLogger(PeriodicQueryPrunerExecutor.class); - private FluoClient client; - private int numThreads; - private ExecutorService executor; - private BlockingQueue<NodeBin> bins; - private PeriodicQueryResultStorage periodicStorage; - private List<PeriodicQueryPruner> pruners; - private boolean running = false; - - public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage periodicStorage, FluoClient client, int numThreads, - BlockingQueue<NodeBin> bins) { - Preconditions.checkArgument(numThreads > 0); - this.periodicStorage = periodicStorage; - this.numThreads = numThreads; - executor = Executors.newFixedThreadPool(numThreads); - this.bins = bins; - this.client = client; - this.pruners = new ArrayList<>(); - } - - @Override - public void start() { - if (!running) { - AccumuloBinPruner accPruner = new AccumuloBinPruner(periodicStorage); - FluoBinPruner fluoPruner = new FluoBinPruner(client); - - for (int threadNumber = 0; threadNumber < numThreads; threadNumber++) { - PeriodicQueryPruner pruner = new PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber); - pruners.add(pruner); - executor.submit(pruner); - } - running = true; - } - } - - @Override - public void stop() { - if (pruners != null && pruners.size() > 0) { - pruners.forEach(x -> x.shutdown()); - } - if(client != null) { - client.close(); - } - if (executor != null) { - executor.shutdown(); - running = false; - } - try { - if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { - log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); - executor.shutdownNow(); - } - } catch (InterruptedException e) { - log.info("Interrupted during shutdown, exiting uncleanly"); - } - } - - @Override - public boolean currentlyRunning() { - return running; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java deleted file mode 100644 index 69bd39c..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.recovery; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.ColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; -import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification.Command; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; - -/** - * This class is used by the {@link PeriodicNotificationCoordinatorExecutor} - * to add all existing {@link PeriodicNotification}s stored in Fluo when it is - * initialized. This enables the the {@link PeriodicServiceApplication} to be - * recovered from failure by restoring it original state. - * - */ -public class PeriodicNotificationProvider { - - private FluoQueryMetadataDAO dao; - - public PeriodicNotificationProvider() { - this.dao = new FluoQueryMetadataDAO(); - } - - /** - * Retrieve all of the information about Periodic Query results already registered - * with Fluo. This is returned in the form of {@link CommandNotification}s that - * can be registered with the {@link NotificationCoordinatorExecutor}. - * @param sx - snapshot for reading results from Fluo - * @return - collection of CommandNotifications that indicate Periodic Query information registered with system - */ - public Collection<CommandNotification> getNotifications(Snapshot sx) { - Set<PeriodicQueryMetadata> periodicMetadata = new HashSet<>(); - RowScanner scanner = sx.scanner().fetch(FluoQueryColumns.PERIODIC_QUERY_NODE_ID) - .over(Span.prefix(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX)).byRow().build(); - Iterator<ColumnScanner> colScannerIter = scanner.iterator(); - while (colScannerIter.hasNext()) { - ColumnScanner colScanner = colScannerIter.next(); - Iterator<ColumnValue> values = colScanner.iterator(); - while (values.hasNext()) { - PeriodicQueryMetadata metadata = dao.readPeriodicQueryMetadata(sx, values.next().getsValue()); - periodicMetadata.add(metadata); - } - } - return getCommandNotifications(sx, periodicMetadata); - } - - /** - * Registers all of Periodic Query information already contained within Fluo to the - * {@link NotificationCoordinatorExecutor}. - * @param coordinator - coordinator that periodic info will be registered with - * @param sx - snapshot for reading results from Fluo - */ - public void processRegisteredNotifications(NotificationCoordinatorExecutor coordinator, Snapshot sx) { - coordinator.start(); - Collection<CommandNotification> notifications = getNotifications(sx); - for(CommandNotification notification: notifications) { - coordinator.processNextCommandNotification(notification); - } - } - - private Collection<CommandNotification> getCommandNotifications(Snapshot sx, Collection<PeriodicQueryMetadata> metadata) { - Set<CommandNotification> notifications = new HashSet<>(); - int i = 1; - for(PeriodicQueryMetadata meta:metadata) { - //offset initial wait to avoid overloading system - PeriodicNotification periodic = new PeriodicNotification(getQueryId(meta.getNodeId(), sx), meta.getPeriod(),TimeUnit.MILLISECONDS,i*5000); - notifications.add(new CommandNotification(Command.ADD, periodic)); - i++; - } - return notifications; - } - - private String getQueryId(String periodicNodeId, Snapshot sx) { - return getQueryIdFromPeriodicId(sx, periodicNodeId); - } - - private String getQueryIdFromPeriodicId(Snapshot sx, String nodeId) { - NodeType nodeType = NodeType.fromNodeId(nodeId).orNull(); - String id = null; - switch (nodeType) { - case FILTER: - id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.FILTER_PARENT_NODE_ID).toString()); - break; - case PERIODIC_QUERY: - id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString()); - break; - case QUERY: - id = FluoQueryUtils.convertFluoQueryIdToPcjId(nodeId); - break; - case AGGREGATION: - id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString()); - break; - case CONSTRUCT: - id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString()); - break; - case PROJECTION: - id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PROJECTION_PARENT_NODE_ID).toString()); - break; - default: - throw new IllegalArgumentException("Invalid node type"); - - } - return id; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java deleted file mode 100644 index f5cd13a..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.registration.kafka; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.rya.periodic.notification.api.LifeCycle; -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Consumer group to pull all requests for adding and deleting {@link Notification}s - * from Kafka. This Object executes {@link PeriodicNotificationConsumer}s that retrieve - * the {@link CommandNotification}s and register them with the {@link NotificationCoordinatorExecutor}. - * - */ -public class KafkaNotificationProvider implements LifeCycle { - private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationProvider.class); - private String topic; - private ExecutorService executor; - private NotificationCoordinatorExecutor coord; - private Properties props; - private int numThreads; - private boolean running = false; - Deserializer<String> keyDe; - Deserializer<CommandNotification> valDe; - List<PeriodicNotificationConsumer> consumers; - - /** - * Create KafkaNotificationProvider for reading new notification requests form Kafka - * @param topic - notification topic - * @param keyDe - Kafka message key deserializer - * @param valDe - Kafka message value deserializer - * @param props - properties used to creates a {@link KafkaConsumer} - * @param coord - {@link NotificationCoordinatorExecutor} for managing and generating notifications - * @param numThreads - number of threads used by this notification provider - */ - public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, Deserializer<CommandNotification> valDe, Properties props, - NotificationCoordinatorExecutor coord, int numThreads) { - this.coord = coord; - this.numThreads = numThreads; - this.topic = topic; - this.props = props; - this.consumers = new ArrayList<>(); - this.keyDe = keyDe; - this.valDe = valDe; - } - - @Override - public void stop() { - if (consumers != null && consumers.size() > 0) { - for (PeriodicNotificationConsumer consumer : consumers) { - consumer.shutdown(); - } - } - if (executor != null) { - executor.shutdown(); - } - running = false; - try { - if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { - LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); - executor.shutdownNow(); - } - } catch (InterruptedException e) { - LOG.info("Interrupted during shutdown, exiting uncleanly"); - } - } - - public void start() { - if (!running) { - if (!coord.currentlyRunning()) { - coord.start(); - } - // now launch all the threads - executor = Executors.newFixedThreadPool(numThreads); - - // now create consumers to consume the messages - int threadNumber = 0; - for (int i = 0; i < numThreads; i++) { - LOG.info("Creating consumer:" + threadNumber); - KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<String, CommandNotification>(props, keyDe, valDe); - PeriodicNotificationConsumer periodicConsumer = new PeriodicNotificationConsumer(topic, consumer, threadNumber, coord); - consumers.add(periodicConsumer); - executor.submit(periodicConsumer); - threadNumber++; - } - running = true; - } - } - - @Override - public boolean currentlyRunning() { - return running; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java deleted file mode 100644 index 6785ce8..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.registration.kafka; - -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.log4j.Logger; -import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.notification.CommandNotification; - -/** - * Consumer for the {@link KafkaNotificationProvider}. This consumer pull messages - * from Kafka and registers them with the {@link NotificationCoordinatorExecutor}. - * - */ -public class PeriodicNotificationConsumer implements Runnable { - private KafkaConsumer<String, CommandNotification> consumer; - private int m_threadNumber; - private String topic; - private final AtomicBoolean closed = new AtomicBoolean(false); - private NotificationCoordinatorExecutor coord; - private static final Logger LOG = Logger.getLogger(PeriodicNotificationConsumer.class); - - /** - * Creates a new PeriodicNotificationConsumer for consuming new notification requests from - * Kafka. - * @param topic - new notification topic - * @param consumer - consumer for pulling new requests from Kafka - * @param a_threadNumber - number of consumer threads to be used - * @param coord - notification coordinator for managing and generating notifications - */ - public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, CommandNotification> consumer, int a_threadNumber, - NotificationCoordinatorExecutor coord) { - this.topic = topic; - m_threadNumber = a_threadNumber; - this.consumer = consumer; - this.coord = coord; - } - - public void run() { - - try { - LOG.info("Creating kafka stream for consumer:" + m_threadNumber); - consumer.subscribe(Arrays.asList(topic)); - while (!closed.get()) { - ConsumerRecords<String, CommandNotification> records = consumer.poll(10000); - // Handle new records - for(ConsumerRecord<String, CommandNotification> record: records) { - CommandNotification notification = record.value(); - LOG.info("Thread " + m_threadNumber + " is adding notification " + notification + " to queue."); - LOG.info("Message: " + notification); - coord.processNextCommandNotification(notification); - } - } - } catch (WakeupException e) { - // Ignore exception if closing - if (!closed.get()) throw e; - } finally { - consumer.close(); - } - } - - public void shutdown() { - closed.set(true); - consumer.wakeup(); - } -}