Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167242070 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * <p> + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + + /** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ + private final QueryChangeLogSource changeLogSource; + + /** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ + private final QueryExecutor queryExecutor; + + /** + * How long blocking operations will be attempted before potentially trying again. + */ + private final long blockingValue; + + /** + * The units for {@link #blockingValue}. + */ + private final TimeUnit blockingUnits; + + /** + * Used to inform threads that the application is shutting down, so they must stop work. + */ + private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + /** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ + private final ExecutorService executor = Executors.newFixedThreadPool(2); + + /** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ + public QueryManager( + final QueryExecutor queryExecutor, + final QueryChangeLogSource source, + final long blockingValue, + final TimeUnit blockingUnits) { + this.changeLogSource = requireNonNull(source); + this.queryExecutor = requireNonNull(queryExecutor); + Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + } + + @Override + protected void doStart() { + log.info("Starting a QueryManager."); + + // A work queue of discovered Query Change Logs that need to be handled. + // This queue exists so that the source notifying thread may be released + // immediately instead of calling into blocking functions. + final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024); + + // A work queue of discovered Query Changes from the monitored Query Change Logs + // that need to be handled. This queue exists so that the Query Repository notifying + // thread may be released immediately instead of calling into blocking functions. + final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024); + + try { + // Start up a LogEventWorker using the executor service. + executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal)); + + // Start up a QueryEvent Worker using the executor service. + executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal)); + + // Start up the query execution framework. + queryExecutor.startAndWait(); + + // Startup the source that discovers new Query Change Logs. + changeLogSource.startAndWait(); + + // Subscribe the source a listener that writes to the LogEventWorker's work queue. + changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal)); + } catch(final RejectedExecutionException | UncheckedExecutionException e) { + log.error("Could not start up a QueryManager.", e); + notifyFailed(e); + } + + // Notify the service was successfully started. + notifyStarted(); + + log.info("QueryManager has finished starting."); + } + + @Override + protected void doStop() { + log.info("Stopping a QueryManager."); + + // Set the shutdown flag so that all components that rely on that signal will stop processing. + shutdownSignal.set(true); + + // Stop the workers and wait for them to die. + executor.shutdownNow(); + try { + if(!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + } catch (final InterruptedException e) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + + // Stop the source of new Change Logs. + try { + changeLogSource.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Change Log Source.", e); + } + + // Stop the query execution framework. + try { + queryExecutor.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Query Executor", e); + } + + // Notify the service was successfully stopped. + notifyStopped(); + + log.info("QueryManager has finished stopping."); + } + + /** + * Offer a unit of work to a blocking queue until it is either accepted, or the + * shutdown signal is set. + * + * @param workQueue - The blocking work queue to write to. (not null) + * @param event - The event that will be offered to the work queue. (not null) + * @param offerValue - How long to wait when offering new work. + * @param offerUnits - The unit for the {@code offerValue}. (not null) + * @param shutdownSignal - Used to signal application shutdown has started, so + * this method may terminate without ever placing the event on the queue. (not null) + * @return {@code true} if the evet nwas added to the queue, otherwise false. + */ + private static <T> boolean offerUntilAcceptedOrShutdown( + final BlockingQueue<T> workQueue, + final T event, final + long offerValue, + final TimeUnit offerUnits, + final AtomicBoolean shutdownSignal) { + requireNonNull(workQueue); + requireNonNull(event); + requireNonNull(shutdownSignal); + + boolean submitted = false; + while(!submitted && !shutdownSignal.get()) { + try { + submitted = workQueue.offer(event, offerValue, offerUnits); + if(!submitted) { + log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again..."); + } + } catch (final InterruptedException e) { + log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again..."); + } + } + return submitted; + } + + /** + * An observation that a {@link QueryChangeLog} was created within or + * removed from a {@link QueryChangeLogSource}. + */ + @DefaultAnnotation(NonNull.class) + static class LogEvent { + + /** + * The types of events that may be observed. + */ + static enum LogEventType { + /** + * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}. + */ + CREATE, + + /** + * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}. + */ + DELETE; + } + + private final String ryaInstance; + private final LogEventType eventType; + private final Optional<QueryChangeLog> log; + + /** + * Constructs an instance of {@link LogEvent}. + * + * @param ryaInstance - The Rya Instance the log is/was for. (not null) + * @param eventType - The type of event that was observed. (not null) + * @param log - The log if this is a create event. (not null) + */ + private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) { + this.ryaInstance = requireNonNull(ryaInstance); + this.eventType = requireNonNull(eventType); + this.log = requireNonNull(log); + } + + /** + * @return The Rya Instance whose log was either created or deleted. + */ + public String getRyaInstanceName() { + return ryaInstance; + } + + /** + * @return The type of event that was observed. + */ + public LogEventType getEventType() { + return eventType; + } + + /** + * @return The {@link QueryChangeLog} if this is a CREATE event. + */ + public Optional<QueryChangeLog> getQueryChangeLog() { + return log; + } + + @Override + public String toString() { + return "LogEvent {\n" + + " Rya Instance: " + ryaInstance + ",\n" + + " Event Type: " + eventType + "\n" + + "}"; + } + + /** + * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a + * {@link QueryChangeLogSource}. + * + * @param ryaInstance - The Rya Instance the created log is for. (not null) + * @param log - The created {@link QueryChangeLog. (not null) + * @return A {@link LogEvent} built using the provided values. + */ + public static LogEvent create(final String ryaInstance, final QueryChangeLog log) { + return new LogEvent(ryaInstance, LogEventType.CREATE ,Optional.of(log)); + } + + /** + * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from + * a {@link QueryChangeLogSource}. + * + * @param ryaInstance - The Rya Instance whose log was deleted. (not null) + * @return A {@link LogEvent} built using the provided values. + */ + public static LogEvent delete(final String ryaInstance) { + return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty()); + } + } + + /** + * An observation that a {@link StreamsQuery} needs to be executing or not + * via the provided {@link QueryExecutor}. + */ + @DefaultAnnotation(NonNull.class) + static class QueryEvent { + + /** + * The type of events that may be observed. + */ + public static enum QueryEventType { + /** + * Indicates a {@link StreamsQuery} needs to be executing. + */ + EXECUTING, + + /** + * Indicates a {@link StreamsQuery} needs to be stopped. + */ + STOPPED, + + /** + * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped. + */ + STOP_ALL; + } + + private final String ryaInstance; + private final QueryEventType type; + private final Optional<UUID> queryId; + private final Optional<StreamsQuery> query; + + /** + * Constructs an instance of {@link QueryEvent}. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @param type - Indicates whether the query needs to be executing or not. (not null) + * @param queryId - If stopped, the ID of the query that must not be running. (not null) + * @param query - If executing, the StreamsQuery that defines what should be executing. (not null) + */ + private QueryEvent( + final String ryaInstance, + final QueryEventType type, + final Optional<UUID> queryId, + final Optional<StreamsQuery> query) { + this.ryaInstance = requireNonNull(ryaInstance); + this.type = requireNonNull(type); + this.queryId = requireNonNull(queryId); + this.query = requireNonNull(query); + } + + /** + * @return The Rya instance that generated the event. + */ + public String getRyaInstance() { + return ryaInstance; + } + + /** + * @return Indicates whether the query needs to be executing or not. + */ + public QueryEventType getType() { + return type; + } + + /** + * @return If stopped, the ID of the query that must not be running. Otherwise absent. + */ + public Optional<UUID> getQueryId() { + return queryId; + } + + /** + * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent. + */ + public Optional<StreamsQuery> getStreamsQuery() { + return query; + } + + @Override + public int hashCode() { + return Objects.hash(ryaInstance, type, queryId, query); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof QueryEvent) { + final QueryEvent other = (QueryEvent) o; + return Objects.equals(ryaInstance, other.ryaInstance) && + Objects.equals(type, other.type) && + Objects.equals(queryId, other.queryId) && + Objects.equals(query, other.query); + } + return false; + } + + @Override + public String toString() { + final StringBuilder string = new StringBuilder(); + string.append("Query Event {\n") + .append(" Rya Instance:").append(ryaInstance).append(",\n") + .append(" Type: ").append(type).append(",\n"); + switch(type) { + case EXECUTING: + append(string, query.get()); + break; + case STOPPED: + string.append(" Query ID: ").append(queryId.get()).append("\n"); + break; + case STOP_ALL: + break; + default: + // Default to showing everything that is in the object. + string.append(" Query ID: ").append(queryId.get()).append("\n"); + append(string, query.get()); + break; + } + string.append("}"); + return string.toString(); + } + + private void append(final StringBuilder string, final StreamsQuery query) { + requireNonNull(string); + requireNonNull(query); + string.append(" Streams Query {\n") + .append(" Query ID: ").append(query.getQueryId()).append(",\n") + .append(" Is Active: ").append(query.isActive()).append(",\n") + .append(" SPARQL: ").append(query.getSparql()).append("\n") + .append(" }"); + } + + /** + * Create a {@link QueryEvent} that indicates a query needs to be executing. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @param query - The StreamsQuery that defines what should be executing. (not null) + * @return A {@link QueryEvent} built using the provided values. + */ + public static QueryEvent executing(final String ryaInstance, final StreamsQuery query) { + return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, Optional.empty(), Optional.of(query)); + } + + /** + * Create a {@link QueryEvent} that indicates a query needs to be stopped. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @param queryId - The ID of the query that must not be running. (not null) + * @return A {@link QueryEvent} built using the provided values. + */ + public static QueryEvent stopped(final String ryaInstance, final UUID queryId) { + return new QueryEvent(ryaInstance, QueryEventType.STOPPED, Optional.of(queryId), Optional.empty()); + } + + /** + * Create a {@link QueryEvent} that indicates all queries for a Rya instance needs to be stopped. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @return A {@link QueryEvent} built using the provided values. + */ + public static QueryEvent stopALL(final String ryaInstance) { + return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, Optional.empty(), Optional.empty()); + } + } + + /** + * Listens to a {@link QueryChangeLogSource} and adds observations to the provided + * work queue. It does so until the provided shutdown signal is set. + */ + @DefaultAnnotation(NonNull.class) + static class LogEventWorkGenerator implements SourceListener { + + private final BlockingQueue<LogEvent> workQueue; + private final AtomicBoolean shutdownSignal; + private final long offerValue; + private final TimeUnit offerUnits; + + /** + * Constructs an instance of {@link QueryManagerSourceListener}. + * + * @param workQueue - A blocking queue that will have {@link LogEvent}s offered to it. (not null) + * @param offerValue - How long to wait when offering new work. + * @param offerUnits - The unit for the {@code offerValue}. (not null) + * @param shutdownSignal - Indicates to this listener that it needs to stop adding events + * to the work queue because the application is shutting down. (not null) + */ + public LogEventWorkGenerator( + final BlockingQueue<LogEvent> workQueue, + final long offerValue, + final TimeUnit offerUnits, + final AtomicBoolean shutdownSignal) { + this.workQueue = requireNonNull(workQueue); + this.shutdownSignal = requireNonNull(shutdownSignal); + this.offerValue = offerValue; + this.offerUnits = requireNonNull(offerUnits); + } + + @Override + public void notifyCreate(final String ryaInstanceName, final QueryChangeLog changeLog) { + log.info("A new Query Change Log has been discovered for Rya Instance " + ryaInstanceName + ". All " + + "queries that are set to active within it will be started."); + + // Create an event that summarizes this notification. + final LogEvent event = LogEvent.create(ryaInstanceName, changeLog); + + // Offer it to the worker until there is room for it in the work queue, or we are shutting down. + offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal); + } + + @Override + public void notifyDelete(final String ryaInstanceName) { + log.info("The Query Change Log for Rya Instance " + ryaInstanceName + " has been deleted. All of the " + + "queries related to that instance will be stopped."); + + // Create an event that summarizes this notification. + final LogEvent event = LogEvent.delete(ryaInstanceName); + + // Offer it to the worker until there is room for it in the work queue, or we are shutting down. + offerUntilAcceptedOrShutdown(workQueue, event, offerValue, offerUnits, shutdownSignal); + } + } + + /** + * Processes a work queue of {@link LogEvent}s. + * <p/> + * Whenever a new log has been created, then it registers a {@link QueryEventWorkGenerator} + * that generates {@link QueryEvent}s based on the content and updates to the discovered + * {@link QueryChagneLog}. + * <p/> + * Whenever a log is deleted, then the generator is stopped and a stop all {@link QueryEvent} + * is written to the work queue. + */ + @DefaultAnnotation(NonNull.class) + static class LogEventWorker implements Runnable { + + /** + * A map of Rya Instance name to he Query Repository for that instance. + */ + private final Map<String, QueryRepository> repos = new HashMap<>(); + + private final BlockingQueue<LogEvent> logWorkQueue; + private final BlockingQueue<QueryEvent> queryWorkQueue; + private final long blockingValue; + private final TimeUnit blockingUnits; + private final AtomicBoolean shutdownSignal; + + /** + * Constructs an instance of {@link LogEventWorker}. + * + * @param logWorkQueue - A queue of {@link LogEvent}s that will be worked by this object. (not null) + * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null) + * @param blockingValue - How long to wait when polling/offering new work. + * @param blockingUnits - The unit for the {@code blockingValue}. (not null) + * @param shutdownSignal - Indicates when the application has been shutdown, so the executing thread + * may exit the {@link #run()} method. (not null) + */ + public LogEventWorker( + final BlockingQueue<LogEvent> logWorkQueue, + final BlockingQueue<QueryEvent> queryWorkQueue, + final long blockingValue, + final TimeUnit blockingUnits, + final AtomicBoolean shutdownSignal) { + this.logWorkQueue = requireNonNull(logWorkQueue); + this.queryWorkQueue = requireNonNull(queryWorkQueue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + this.shutdownSignal = requireNonNull(shutdownSignal); + } + + @Override + public void run() { + // Run until the shutdown signal is set. + while(!shutdownSignal.get()) { + try { + // Pull a unit of work from the queue. + log.debug("LogEventWorker - Polling the work queue for a new LogEvent."); + final LogEvent logEvent = logWorkQueue.poll(blockingValue, blockingUnits); + if(logEvent == null) { + // Poll again if nothing was found. + continue; + } + + log.info("LogEventWorker - handling: \n" + logEvent); + final String ryaInstance = logEvent.getRyaInstanceName(); + + switch(logEvent.getEventType()) { + case CREATE: + // If we see a create message for a Rya Instance we are already maintaining, + // then don't do anything. + if(repos.containsKey(ryaInstance)) { + log.warn("LogEventWorker - A repository is already being managed for the Rya Instance " + + ryaInstance + ". This message will be ignored."); + continue; + } + + // Create and start a QueryRepository for the discovered log. Hold onto the repository + // so that it may be shutdown later. + final Scheduler scheduler = Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits); + final QueryRepository repo = new InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler); + repo.startAndWait(); + repos.put(ryaInstance, repo); + + // Subscribe a worker that adds the Query Events to the queryWorkQueue queue. + // A count down latch is used to ensure the returned set of queries are handled + // prior to any notifications from the repository. + final CountDownLatch subscriptionWorkFinished = new CountDownLatch(1); + final QueryEventWorkGenerator queryWorkGenerator = + new QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue, + blockingValue, blockingUnits, shutdownSignal); + + log.debug("LogEventWorker - Setting up a QueryWorkGenerator..."); + final Set<StreamsQuery> queries = repo.subscribe(queryWorkGenerator); + log.debug("LogEventWorker - Finished setting up a QueryWorkGenerator."); + + // Handle the view of the queries within the repository as it existed when + // the subscription was registered. + queries.stream() + .forEach(query -> { + // Create a QueryEvent that represents the active state of the existing query. + final QueryEvent queryEvent = query.isActive() ? + QueryEvent.executing(ryaInstance, query) : QueryEvent.stopped(ryaInstance, query.getQueryId()); + log.debug("LogEventWorker - offering: " + queryEvent); + + // Offer it to the worker until there is room for it in the work queue, or we are shutting down. + offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, blockingUnits, shutdownSignal); + }); + + // Indicate the subscription work is finished so that the registered listener may start + // adding work to the queue. + log.info("LogEventWorker - Counting down the subscription work latch."); + subscriptionWorkFinished.countDown(); + break; + + case DELETE: + if(repos.containsKey(ryaInstance)) { + // Shut down the query repository for the Rya instance. This ensures the listener will + // not receive any more work that needs to be done. + final QueryRepository deletedRepo = repos.remove(ryaInstance); + deletedRepo.stopAndWait(); + + // Add work that stops all of the queries related to the instance. + final QueryEvent stopAllEvent = QueryEvent.stopALL(ryaInstance); + offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, blockingUnits, shutdownSignal); + } + break; + } + } catch (final InterruptedException e) { + log.debug("LogEventWorker did not see any new events over the past 5 seconds. Polling again..."); + } + } + + log.info("LogEventWorker shutting down..."); + + // Shutdown all of the QueryRepositories that were started. + repos.values().forEach(repo -> repo.stopAndWait()); + + log.info("LogEventWorker shut down."); + } + } + + /** + * Listens to a {@link QueryRepository} and adds observations to the provided work queue. + * It does so until the provided shutdown signal is set. + */ + @DefaultAnnotation(NonNull.class) + static class QueryEventWorkGenerator implements QueryChangeLogListener { + + private final String ryaInstance; + private final CountDownLatch subscriptionWorkFinished; + private final BlockingQueue<QueryEvent> queryWorkQueue; + private final long blockingValue; + private final TimeUnit blockingUnits; + private final AtomicBoolean shutdownSignal; + + /** + * Constructs an instance of {@link QueryEventWorkGenerator}. + * + * @param ryaInstance - The rya instance whose log this objects is watching. (not null) + * @param subscriptionWorkFinished - Indicates when work that needs to be completed before this + * listener handles notifications is completed. (not null) + * @param queryWorkQueue - A queue where {@link QueryEvent}s will be placed by this object. (not null) + * @param blockingValue - How long to wait when polling/offering new work. + * @param blockingUnits - The unit for the {@code blockingValue}. (not null) + * @param shutdownSignal - Indicates to this listener that it needs to stop adding events + * to the work queue because the application is shutting down. (not null) + */ + public QueryEventWorkGenerator( + final String ryaInstance, + final CountDownLatch subscriptionWorkFinished, + final BlockingQueue<QueryEvent> queryWorkQueue, + final long blockingValue, + final TimeUnit blockingUnits, + final AtomicBoolean shutdownSignal) { + this.ryaInstance = requireNonNull(ryaInstance); + this.subscriptionWorkFinished = requireNonNull(subscriptionWorkFinished); + this.queryWorkQueue = requireNonNull(queryWorkQueue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + this.shutdownSignal = requireNonNull(shutdownSignal); + } + + @Override + public void notify(final ChangeLogEntry<QueryChange> queryChangeEvent, final Optional<StreamsQuery> newQueryState) { + requireNonNull(queryChangeEvent); + requireNonNull(newQueryState); + + // Wait for the subscription work to be finished. + try { + log.debug("Waiting for Subscription Work Finished latch to release..."); + while(!shutdownSignal.get() && !subscriptionWorkFinished.await(blockingValue, blockingUnits)) { + log.debug("Still waiting..."); + } + log.debug("Subscription Work Finished latch to released."); + } catch (final InterruptedException e) { + log.warn("Interrupted while waiting for the Subscription Work Finished latch to be " + + "released. Shutting down?", e); + } + + // If we left the loop because of a shutdown, return immediately. + if(shutdownSignal.get()) { + log.debug("Not processing notification. Shutting down."); + return; + } + + // Generate work from the notification. + final QueryChange change = queryChangeEvent.getEntry(); + switch(change.getChangeType()) { + case CREATE: + if(newQueryState.isPresent()) { + log.info("Rya Instance " + ryaInstance + " created Rya Streams query " + newQueryState + "."); + final StreamsQuery newQuery = newQueryState.get(); + if(newQuery.isActive()) { + final QueryEvent executeNewQuery = QueryEvent.executing(ryaInstance, newQuery); + offerUntilAcceptedOrShutdown(queryWorkQueue, executeNewQuery, blockingValue, blockingUnits, shutdownSignal); + } + } else { + log.error("Received a CREATE QueryChange for Rya Instance: " + ryaInstance + + ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " + + "StreamsQuery representing the created query. The query will not be processed."); + } + break; + + case DELETE: + final UUID deletedQueryId = change.getQueryId(); + log.info("Rya Instance " + ryaInstance + " deleted Rya Streams query with ID " + deletedQueryId); + final QueryEvent stopDeletedQuery = QueryEvent.stopped(ryaInstance, deletedQueryId); + offerUntilAcceptedOrShutdown(queryWorkQueue, stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal); + break; + + case UPDATE: + if(newQueryState.isPresent()) { + final StreamsQuery updatedQuery = newQueryState.get(); + if(updatedQuery.isActive()) { + log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " + + updatedQuery.getQueryId() + " to be active."); + final QueryEvent executeUpdatedQuery = QueryEvent.executing(ryaInstance, updatedQuery); + offerUntilAcceptedOrShutdown(queryWorkQueue, executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal); + } else { + log.info("Rya Instance " + ryaInstance + " updated Rya Streams query with ID " + + updatedQuery.getQueryId() + " to be inactive."); + final QueryEvent stopUpdatedQuery = QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId()); + offerUntilAcceptedOrShutdown(queryWorkQueue, stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal); + } + } else { + log.error("Received an UPDATE QueryChange for Rya Instance: " + ryaInstance + + ", Query ID: " + change.getQueryId() + ", but the QueryRepository did not supply a " + + "StreamsQuery representing the created query. The query will not be processed."); + } + break; + } + } + } + + /** + * Processes a work queue of {@link QueryEvent}s. + * <p/> + * Each type of event maps the to corresponding method on {@link QueryExecutor} that is called into. + */ + @DefaultAnnotation(NonNull.class) + static class QueryEventWorker implements Runnable { + + private final BlockingQueue<QueryEvent> workQueue; + private final QueryExecutor queryExecutor; + private final long pollingValue; + private final TimeUnit pollingUnits; + private final AtomicBoolean shutdownSignal; + + /** + * Constructs an instance of {@link QueryEventWorker}. + * + * @param logWorkQueue - A queue of {@link QueryEvent}s that will be worked by this object. (not null) --- End diff -- the @param name doesn't match
---