[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358526#comment-16358526
 ] 

ASF GitHub Bot commented on RYA-443:
------------------------------------

Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/272#discussion_r167240617
  
    --- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
    @@ -0,0 +1,884 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.streams.querymanager;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.rya.streams.api.entity.StreamsQuery;
    +import org.apache.rya.streams.api.queries.ChangeLogEntry;
    +import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
    +import org.apache.rya.streams.api.queries.QueryChange;
    +import org.apache.rya.streams.api.queries.QueryChangeLog;
    +import org.apache.rya.streams.api.queries.QueryChangeLogListener;
    +import org.apache.rya.streams.api.queries.QueryRepository;
    +import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
    +import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
    +import com.google.common.util.concurrent.AbstractService;
    +import com.google.common.util.concurrent.UncheckedExecutionException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
    + * <p>
    + * Only one QueryManager needs to be running to manage any number of rya
    + * instances/rya streams instances.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class QueryManager extends AbstractService {
    +    private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
    +
    +    /**
    +     * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
    +     * Rya instnace.
    +     */
    +    private final QueryChangeLogSource changeLogSource;
    +
    +    /**
    +     * The engine that is responsible for executing {@link StreamsQuery}s.
    +     */
    +    private final QueryExecutor queryExecutor;
    +
    +    /**
    +     * How long blocking operations will be attempted before potentially 
trying again.
    +     */
    +    private final long blockingValue;
    +
    +    /**
    +     * The units for {@link #blockingValue}.
    +     */
    +    private final TimeUnit blockingUnits;
    +
    +    /**
    +     * Used to inform threads that the application is shutting down, so 
they must stop work.
    +     */
    +    private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
    +
    +    /**
    +     * This thread pool manages the two thread used to work the {@link 
LogEvent}s
    +     * and the {@link QueryEvent}s.
    +     */
    +    private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
    +
    +    /**
    +     * Creates a new {@link QueryManager}.
    +     *
    +     * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
    +     * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
    +     * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
    +     * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
    +     */
    +    public QueryManager(
    +            final QueryExecutor queryExecutor,
    +            final QueryChangeLogSource source,
    +            final long blockingValue,
    +            final TimeUnit blockingUnits) {
    +        this.changeLogSource = requireNonNull(source);
    +        this.queryExecutor = requireNonNull(queryExecutor);
    +        Preconditions.checkArgument(blockingValue > 0, "The blocking value 
must be > 0. Was: " + blockingValue);
    +        this.blockingValue = blockingValue;
    +        this.blockingUnits = requireNonNull(blockingUnits);
    +    }
    +
    +    @Override
    +    protected void doStart() {
    +        log.info("Starting a QueryManager.");
    +
    +        // A work queue of discovered Query Change Logs that need to be 
handled.
    +        // This queue exists so that the source notifying thread may be 
released
    +        // immediately instead of calling into blocking functions.
    +        final BlockingQueue<LogEvent> logEvents = new 
ArrayBlockingQueue<>(1024);
    +
    +        // A work queue of discovered Query Changes from the monitored 
Query Change Logs
    +        // that need to be handled. This queue exists so that the Query 
Repository notifying
    +        // thread may be released immediately instead of calling into 
blocking functions.
    +        final BlockingQueue<QueryEvent> queryEvents = new 
ArrayBlockingQueue<>(1024);
    +
    +        try {
    +            // Start up a LogEventWorker using the executor service.
    +            executor.submit(new LogEventWorker(logEvents, queryEvents, 
blockingValue, blockingUnits, shutdownSignal));
    +
    +            // Start up a QueryEvent Worker using the executor service.
    +            executor.submit(new QueryEventWorker(queryEvents, 
queryExecutor, blockingValue, blockingUnits, shutdownSignal));
    +
    +            // Start up the query execution framework.
    +            queryExecutor.startAndWait();
    +
    +            // Startup the source that discovers new Query Change Logs.
    +            changeLogSource.startAndWait();
    +
    +            // Subscribe the source a listener that writes to the 
LogEventWorker's work queue.
    +            changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, 
blockingValue, blockingUnits, shutdownSignal));
    +        } catch(final RejectedExecutionException | 
UncheckedExecutionException e) {
    +            log.error("Could not start up a QueryManager.", e);
    +            notifyFailed(e);
    +        }
    +
    +        // Notify the service was successfully started.
    +        notifyStarted();
    +
    +        log.info("QueryManager has finished starting.");
    +    }
    +
    +    @Override
    +    protected void doStop() {
    +        log.info("Stopping a QueryManager.");
    +
    +        // Set the shutdown flag so that all components that rely on that 
signal will stop processing.
    +        shutdownSignal.set(true);
    +
    +        // Stop the workers and wait for them to die.
    +        executor.shutdownNow();
    +        try {
    +            if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
    +                log.warn("Waited 10 seconds for the worker threads to die, 
but they are still running.");
    +            }
    +        } catch (final InterruptedException e) {
    +            log.warn("Waited 10 seconds for the worker threads to die, but 
they are still running.");
    +        }
    +
    +        // Stop the source of new Change Logs.
    +        try {
    +            changeLogSource.stopAndWait();
    +        } catch(final UncheckedExecutionException e) {
    +            log.warn("Could not stop the Change Log Source.", e);
    +        }
    +
    +        // Stop the query execution framework.
    +        try {
    +            queryExecutor.stopAndWait();
    +        } catch(final UncheckedExecutionException e) {
    +            log.warn("Could not stop the Query Executor", e);
    +        }
    +
    +        // Notify the service was successfully stopped.
    +        notifyStopped();
    +
    +        log.info("QueryManager has finished stopping.");
    +    }
    +
    +    /**
    +     * Offer a unit of work to a blocking queue until it is either 
accepted, or the
    +     * shutdown signal is set.
    +     *
    +     * @param workQueue - The blocking work queue to write to. (not null)
    +     * @param event - The event that will be offered to the work queue. 
(not null)
    +     * @param offerValue - How long to wait when offering new work.
    +     * @param offerUnits - The unit for the {@code offerValue}. (not null)
    +     * @param shutdownSignal - Used to signal application shutdown has 
started, so
    +     *   this method may terminate without ever placing the event on the 
queue. (not null)
    +     * @return {@code true} if the evet nwas added to the queue, otherwise 
false.
    +     */
    +    private static <T> boolean offerUntilAcceptedOrShutdown(
    +            final BlockingQueue<T> workQueue,
    +            final T event, final
    +            long offerValue,
    +            final TimeUnit offerUnits,
    +            final AtomicBoolean shutdownSignal) {
    +        requireNonNull(workQueue);
    +        requireNonNull(event);
    +        requireNonNull(shutdownSignal);
    +
    +        boolean submitted = false;
    +        while(!submitted && !shutdownSignal.get()) {
    +            try {
    +                submitted = workQueue.offer(event, offerValue, offerUnits);
    +                if(!submitted) {
    +                    log.debug("An event could not be added to a work queue 
after waiting 5 seconds. Trying again...");
    +                }
    +            } catch (final InterruptedException e) {
    +                log.debug("An event could not be added to a work queue 
after waiting 5 seconds. Trying again...");
    +            }
    +        }
    +        return submitted;
    +    }
    +
    +    /**
    +     * An observation that a {@link QueryChangeLog} was created within or
    +     * removed from a {@link QueryChangeLogSource}.
    +     */
    +    @DefaultAnnotation(NonNull.class)
    +    static class LogEvent {
    +
    +        /**
    +         * The types of events that may be observed.
    +         */
    +        static enum LogEventType {
    +            /**
    +             * A {@link QueryChangeLog} was created within a {@link 
QueryChangeLogSource}.
    +             */
    +            CREATE,
    +
    +            /**
    +             * A {@link QueryChangeLog} was deleted from a {@link 
QueryChangeLogSource}.
    +             */
    +            DELETE;
    +        }
    +
    +        private final String ryaInstance;
    +        private final LogEventType eventType;
    +        private final Optional<QueryChangeLog> log;
    +
    +        /**
    +         * Constructs an instance of {@link LogEvent}.
    +         *
    +         * @param ryaInstance - The Rya Instance the log is/was for. (not 
null)
    +         * @param eventType - The type of event that was observed. (not 
null)
    +         * @param log - The log if this is a create event. (not null)
    +         */
    +        private LogEvent(final String ryaInstance, final LogEventType 
eventType, final Optional<QueryChangeLog> log) {
    +            this.ryaInstance = requireNonNull(ryaInstance);
    +            this.eventType = requireNonNull(eventType);
    +            this.log = requireNonNull(log);
    +        }
    +
    +        /**
    +         * @return The Rya Instance whose log was either created or 
deleted.
    +         */
    +        public String getRyaInstanceName() {
    +            return ryaInstance;
    +        }
    +
    +        /**
    +         * @return The type of event that was observed.
    +         */
    +        public LogEventType getEventType() {
    +            return eventType;
    +        }
    +
    +        /**
    +         * @return The {@link QueryChangeLog} if this is a CREATE event.
    +         */
    +        public Optional<QueryChangeLog> getQueryChangeLog() {
    +            return log;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "LogEvent {\n" +
    +                   "    Rya Instance: " + ryaInstance + ",\n" +
    +                   "    Event Type: " + eventType + "\n" +
    +                   "}";
    +        }
    +
    +        /**
    +         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} 
was created within a
    +         * {@link QueryChangeLogSource}.
    +         *
    +         * @param ryaInstance - The Rya Instance the created log is for. 
(not null)
    +         * @param log - The created {@link QueryChangeLog. (not null)
    +         * @return A {@link LogEvent} built using the provided values.
    +         */
    +        public static LogEvent create(final String ryaInstance, final 
QueryChangeLog log) {
    +            return new LogEvent(ryaInstance, LogEventType.CREATE 
,Optional.of(log));
    --- End diff --
    
    Fix spacing between last 2 parameters


> Implement a single node query manager
> -------------------------------------
>
>                 Key: RYA-443
>                 URL: https://issues.apache.org/jira/browse/RYA-443
>             Project: Rya
>          Issue Type: Task
>            Reporter: Andrew Smith
>            Assignee: Kevin Chilton
>            Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to