Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167240341 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * <p> + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + + /** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ + private final QueryChangeLogSource changeLogSource; + + /** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ + private final QueryExecutor queryExecutor; + + /** + * How long blocking operations will be attempted before potentially trying again. + */ + private final long blockingValue; + + /** + * The units for {@link #blockingValue}. + */ + private final TimeUnit blockingUnits; + + /** + * Used to inform threads that the application is shutting down, so they must stop work. + */ + private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + /** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ + private final ExecutorService executor = Executors.newFixedThreadPool(2); + + /** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ + public QueryManager( + final QueryExecutor queryExecutor, + final QueryChangeLogSource source, + final long blockingValue, + final TimeUnit blockingUnits) { + this.changeLogSource = requireNonNull(source); + this.queryExecutor = requireNonNull(queryExecutor); + Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + } + + @Override + protected void doStart() { + log.info("Starting a QueryManager."); + + // A work queue of discovered Query Change Logs that need to be handled. + // This queue exists so that the source notifying thread may be released + // immediately instead of calling into blocking functions. + final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024); + + // A work queue of discovered Query Changes from the monitored Query Change Logs + // that need to be handled. This queue exists so that the Query Repository notifying + // thread may be released immediately instead of calling into blocking functions. + final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024); + + try { + // Start up a LogEventWorker using the executor service. + executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal)); + + // Start up a QueryEvent Worker using the executor service. + executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal)); + + // Start up the query execution framework. + queryExecutor.startAndWait(); + + // Startup the source that discovers new Query Change Logs. + changeLogSource.startAndWait(); + + // Subscribe the source a listener that writes to the LogEventWorker's work queue. + changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal)); + } catch(final RejectedExecutionException | UncheckedExecutionException e) { + log.error("Could not start up a QueryManager.", e); + notifyFailed(e); + } + + // Notify the service was successfully started. + notifyStarted(); + + log.info("QueryManager has finished starting."); + } + + @Override + protected void doStop() { + log.info("Stopping a QueryManager."); + + // Set the shutdown flag so that all components that rely on that signal will stop processing. + shutdownSignal.set(true); + + // Stop the workers and wait for them to die. + executor.shutdownNow(); + try { + if(!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + } catch (final InterruptedException e) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + + // Stop the source of new Change Logs. + try { + changeLogSource.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Change Log Source.", e); + } + + // Stop the query execution framework. + try { + queryExecutor.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Query Executor", e); + } + + // Notify the service was successfully stopped. + notifyStopped(); + + log.info("QueryManager has finished stopping."); + } + + /** + * Offer a unit of work to a blocking queue until it is either accepted, or the + * shutdown signal is set. + * + * @param workQueue - The blocking work queue to write to. (not null) + * @param event - The event that will be offered to the work queue. (not null) + * @param offerValue - How long to wait when offering new work. + * @param offerUnits - The unit for the {@code offerValue}. (not null) + * @param shutdownSignal - Used to signal application shutdown has started, so + * this method may terminate without ever placing the event on the queue. (not null) + * @return {@code true} if the evet nwas added to the queue, otherwise false. + */ + private static <T> boolean offerUntilAcceptedOrShutdown( + final BlockingQueue<T> workQueue, + final T event, final + long offerValue, --- End diff -- final and long are on separate lines
---