Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167241732 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * <p> + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + + /** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ + private final QueryChangeLogSource changeLogSource; + + /** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ + private final QueryExecutor queryExecutor; + + /** + * How long blocking operations will be attempted before potentially trying again. + */ + private final long blockingValue; + + /** + * The units for {@link #blockingValue}. + */ + private final TimeUnit blockingUnits; + + /** + * Used to inform threads that the application is shutting down, so they must stop work. + */ + private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + + /** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ + private final ExecutorService executor = Executors.newFixedThreadPool(2); + + /** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ + public QueryManager( + final QueryExecutor queryExecutor, + final QueryChangeLogSource source, + final long blockingValue, + final TimeUnit blockingUnits) { + this.changeLogSource = requireNonNull(source); + this.queryExecutor = requireNonNull(queryExecutor); + Preconditions.checkArgument(blockingValue > 0, "The blocking value must be > 0. Was: " + blockingValue); + this.blockingValue = blockingValue; + this.blockingUnits = requireNonNull(blockingUnits); + } + + @Override + protected void doStart() { + log.info("Starting a QueryManager."); + + // A work queue of discovered Query Change Logs that need to be handled. + // This queue exists so that the source notifying thread may be released + // immediately instead of calling into blocking functions. + final BlockingQueue<LogEvent> logEvents = new ArrayBlockingQueue<>(1024); + + // A work queue of discovered Query Changes from the monitored Query Change Logs + // that need to be handled. This queue exists so that the Query Repository notifying + // thread may be released immediately instead of calling into blocking functions. + final BlockingQueue<QueryEvent> queryEvents = new ArrayBlockingQueue<>(1024); + + try { + // Start up a LogEventWorker using the executor service. + executor.submit(new LogEventWorker(logEvents, queryEvents, blockingValue, blockingUnits, shutdownSignal)); + + // Start up a QueryEvent Worker using the executor service. + executor.submit(new QueryEventWorker(queryEvents, queryExecutor, blockingValue, blockingUnits, shutdownSignal)); + + // Start up the query execution framework. + queryExecutor.startAndWait(); + + // Startup the source that discovers new Query Change Logs. + changeLogSource.startAndWait(); + + // Subscribe the source a listener that writes to the LogEventWorker's work queue. + changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, blockingValue, blockingUnits, shutdownSignal)); + } catch(final RejectedExecutionException | UncheckedExecutionException e) { + log.error("Could not start up a QueryManager.", e); + notifyFailed(e); + } + + // Notify the service was successfully started. + notifyStarted(); + + log.info("QueryManager has finished starting."); + } + + @Override + protected void doStop() { + log.info("Stopping a QueryManager."); + + // Set the shutdown flag so that all components that rely on that signal will stop processing. + shutdownSignal.set(true); + + // Stop the workers and wait for them to die. + executor.shutdownNow(); + try { + if(!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + } catch (final InterruptedException e) { + log.warn("Waited 10 seconds for the worker threads to die, but they are still running."); + } + + // Stop the source of new Change Logs. + try { + changeLogSource.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Change Log Source.", e); + } + + // Stop the query execution framework. + try { + queryExecutor.stopAndWait(); + } catch(final UncheckedExecutionException e) { + log.warn("Could not stop the Query Executor", e); + } + + // Notify the service was successfully stopped. + notifyStopped(); + + log.info("QueryManager has finished stopping."); + } + + /** + * Offer a unit of work to a blocking queue until it is either accepted, or the + * shutdown signal is set. + * + * @param workQueue - The blocking work queue to write to. (not null) + * @param event - The event that will be offered to the work queue. (not null) + * @param offerValue - How long to wait when offering new work. + * @param offerUnits - The unit for the {@code offerValue}. (not null) + * @param shutdownSignal - Used to signal application shutdown has started, so + * this method may terminate without ever placing the event on the queue. (not null) + * @return {@code true} if the evet nwas added to the queue, otherwise false. + */ + private static <T> boolean offerUntilAcceptedOrShutdown( + final BlockingQueue<T> workQueue, + final T event, final + long offerValue, + final TimeUnit offerUnits, + final AtomicBoolean shutdownSignal) { + requireNonNull(workQueue); + requireNonNull(event); + requireNonNull(shutdownSignal); + + boolean submitted = false; + while(!submitted && !shutdownSignal.get()) { + try { + submitted = workQueue.offer(event, offerValue, offerUnits); + if(!submitted) { + log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again..."); + } + } catch (final InterruptedException e) { + log.debug("An event could not be added to a work queue after waiting 5 seconds. Trying again..."); + } + } + return submitted; + } + + /** + * An observation that a {@link QueryChangeLog} was created within or + * removed from a {@link QueryChangeLogSource}. + */ + @DefaultAnnotation(NonNull.class) + static class LogEvent { + + /** + * The types of events that may be observed. + */ + static enum LogEventType { + /** + * A {@link QueryChangeLog} was created within a {@link QueryChangeLogSource}. + */ + CREATE, + + /** + * A {@link QueryChangeLog} was deleted from a {@link QueryChangeLogSource}. + */ + DELETE; + } + + private final String ryaInstance; + private final LogEventType eventType; + private final Optional<QueryChangeLog> log; + + /** + * Constructs an instance of {@link LogEvent}. + * + * @param ryaInstance - The Rya Instance the log is/was for. (not null) + * @param eventType - The type of event that was observed. (not null) + * @param log - The log if this is a create event. (not null) + */ + private LogEvent(final String ryaInstance, final LogEventType eventType, final Optional<QueryChangeLog> log) { + this.ryaInstance = requireNonNull(ryaInstance); + this.eventType = requireNonNull(eventType); + this.log = requireNonNull(log); + } + + /** + * @return The Rya Instance whose log was either created or deleted. + */ + public String getRyaInstanceName() { + return ryaInstance; + } + + /** + * @return The type of event that was observed. + */ + public LogEventType getEventType() { + return eventType; + } + + /** + * @return The {@link QueryChangeLog} if this is a CREATE event. + */ + public Optional<QueryChangeLog> getQueryChangeLog() { + return log; + } + + @Override + public String toString() { + return "LogEvent {\n" + + " Rya Instance: " + ryaInstance + ",\n" + + " Event Type: " + eventType + "\n" + + "}"; + } + + /** + * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was created within a + * {@link QueryChangeLogSource}. + * + * @param ryaInstance - The Rya Instance the created log is for. (not null) + * @param log - The created {@link QueryChangeLog. (not null) + * @return A {@link LogEvent} built using the provided values. + */ + public static LogEvent create(final String ryaInstance, final QueryChangeLog log) { + return new LogEvent(ryaInstance, LogEventType.CREATE ,Optional.of(log)); + } + + /** + * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} was deleted from + * a {@link QueryChangeLogSource}. + * + * @param ryaInstance - The Rya Instance whose log was deleted. (not null) + * @return A {@link LogEvent} built using the provided values. + */ + public static LogEvent delete(final String ryaInstance) { + return new LogEvent(ryaInstance, LogEventType.DELETE, Optional.empty()); + } + } + + /** + * An observation that a {@link StreamsQuery} needs to be executing or not + * via the provided {@link QueryExecutor}. + */ + @DefaultAnnotation(NonNull.class) + static class QueryEvent { + + /** + * The type of events that may be observed. + */ + public static enum QueryEventType { + /** + * Indicates a {@link StreamsQuery} needs to be executing. + */ + EXECUTING, + + /** + * Indicates a {@link StreamsQuery} needs to be stopped. + */ + STOPPED, + + /** + * Indicates all {@link StreamsQuery}s for a Rya instance need to be stopped. + */ + STOP_ALL; + } + + private final String ryaInstance; + private final QueryEventType type; + private final Optional<UUID> queryId; + private final Optional<StreamsQuery> query; + + /** + * Constructs an instance of {@link QueryEvent}. + * + * @param ryaInstance - The Rya instance that generated the event. (not null) + * @param type - Indicates whether the query needs to be executing or not. (not null) + * @param queryId - If stopped, the ID of the query that must not be running. (not null) + * @param query - If executing, the StreamsQuery that defines what should be executing. (not null) + */ + private QueryEvent( + final String ryaInstance, + final QueryEventType type, + final Optional<UUID> queryId, + final Optional<StreamsQuery> query) { + this.ryaInstance = requireNonNull(ryaInstance); + this.type = requireNonNull(type); + this.queryId = requireNonNull(queryId); + this.query = requireNonNull(query); + } + + /** + * @return The Rya instance that generated the event. + */ + public String getRyaInstance() { + return ryaInstance; + } + + /** + * @return Indicates whether the query needs to be executing or not. + */ + public QueryEventType getType() { + return type; + } + + /** + * @return If stopped, the ID of the query that must not be running. Otherwise absent. + */ + public Optional<UUID> getQueryId() { + return queryId; + } + + /** + * @return If executing, the StreamsQuery that defines what should be executing. Otherwise absent. + */ + public Optional<StreamsQuery> getStreamsQuery() { + return query; + } + + @Override + public int hashCode() { + return Objects.hash(ryaInstance, type, queryId, query); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof QueryEvent) { + final QueryEvent other = (QueryEvent) o; + return Objects.equals(ryaInstance, other.ryaInstance) && + Objects.equals(type, other.type) && + Objects.equals(queryId, other.queryId) && + Objects.equals(query, other.query); + } + return false; + } + + @Override + public String toString() { + final StringBuilder string = new StringBuilder(); + string.append("Query Event {\n") + .append(" Rya Instance:").append(ryaInstance).append(",\n") + .append(" Type: ").append(type).append(",\n"); + switch(type) { + case EXECUTING: + append(string, query.get()); + break; + case STOPPED: + string.append(" Query ID: ").append(queryId.get()).append("\n"); + break; + case STOP_ALL: + break; + default: + // Default to showing everything that is in the object. + string.append(" Query ID: ").append(queryId.get()).append("\n"); + append(string, query.get()); + break; + } + string.append("}"); + return string.toString(); + } + + private void append(final StringBuilder string, final StreamsQuery query) { + requireNonNull(string); + requireNonNull(query); + string.append(" Streams Query {\n") + .append(" Query ID: ").append(query.getQueryId()).append(",\n") + .append(" Is Active: ").append(query.isActive()).append(",\n") + .append(" SPARQL: ").append(query.getSparql()).append("\n") --- End diff -- Wrap query.getSparql() in LogUtils.clean()
---