Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/177#discussion_r128001523
  
    --- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
 ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.periodic.notification.application;
    +
    +import org.apache.log4j.Logger;
    +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
    +import org.apache.rya.periodic.notification.api.BinPruner;
    +import org.apache.rya.periodic.notification.api.LifeCycle;
    +import org.apache.rya.periodic.notification.api.NodeBin;
    +import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
    +import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
    +import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
    +import 
org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
    +import 
org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
    +import 
org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
    +import org.openrdf.query.algebra.evaluation.function.Function;
    +
    +import com.google.common.base.Preconditions;
    +
    +/**
    + * The PeriodicNotificationApplication runs the key components of the 
Periodic
    + * Query Service. It consists of a {@link KafkaNotificationProvider}, a
    + * {@link NotificationCoordinatorExecutor}, a
    + * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, 
and a
    + * {@link PeriodicQueryPrunerExecutor}. These services run in coordination 
with
    + * one another to perform the following tasks in the indicated order: <br>
    + * <li>Retrieve new requests to generate periodic notifications from Kafka
    + * <li>Register them with the {@link NotificationCoordinatorExecutor} to
    + * generate the periodic notifications
    + * <li>As notifications are generated, they are added to a work queue that 
is
    + * monitored by the {@link NotificationProcessorExecutor}.
    + * <li>The processor processes the notifications by reading all of the 
query
    + * results corresponding to the bin and query id indicated by the 
notification.
    + * <li>After reading the results, the processor adds a {@link 
BindingSetRecord}
    + * to a work queue monitored by the {@link KafkaExporterExecutor}.
    + * <li>The processor then adds a {@link NodeBin} to a workqueue monitored 
by the
    + * {@link BinPruner}
    + * <li>The exporter processes the BindingSetRecord by exporing the result 
to
    + * Kafka
    + * <li>The BinPruner processes the NodeBin by cleaning up the results for 
the
    + * indicated bin and query in Accumulo and Fluo. <br>
    + * <br>
    + * The purpose of this Periodic Query Service is to facilitate the ability 
to
    + * answer Periodic Queries using the Rya Fluo application, where a Periodic
    + * Query is any query requesting periodic updates about events that 
occurred
    + * within a given window of time of this instant. This is also known as a
    + * rolling window query. Period Queries can be expressed using SPARQL by
    + * including the {@link Function} indicated by the URI
    + * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this
    + * Function with the following arguments: the temporal variable in the 
query
    + * that will be filtered on, the window of time that events must occur 
within,
    + * the period at which the user wants to receive updates, and the time 
unit. The
    + * following query requests all observations that occurred within the last
    + * minute and requests updates every 15 seconds. It also performs a count 
on
    + * those observations. <br>
    + * <br>
    + * <li>prefix function: http://org.apache.rya/function#
    + * <li>"prefix time: http://www.w3.org/2006/time#
    + * <li>"select (count(?obs) as ?total) where {
    + * <li>"Filter(function:periodic(?time, 1, .25, time:minutes))
    + * <li>"?obs uri:hasTime ?time.
    + * <li>"?obs uri:hasId ?id }
    + * <li>
    + */
    +public class PeriodicNotificationApplication implements LifeCycle {
    +
    +    private static final Logger log = 
Logger.getLogger(PeriodicNotificationApplication.class);
    +    private NotificationCoordinatorExecutor coordinator;
    +    private KafkaNotificationProvider provider;
    +    private PeriodicQueryPrunerExecutor pruner;
    +    private NotificationProcessorExecutor processor;
    +    private KafkaExporterExecutor exporter;
    +    private boolean running = false;
    +
    +    /**
    +     * Creates a PeriodicNotificationApplication
    +     * @param provider - {@link KafkaNotificationProvider} that retrieves 
new Notificaiton requests from Kafka
    +     * @param coordinator - {NotificationCoordinator} that manages 
PeriodicNotifications.
    +     * @param processor - {@link NotificationProcessorExecutor} that 
processes PeriodicNotifications
    +     * @param exporter - {@link KafkaExporterExecutor} that exports 
periodic results
    +     * @param pruner - {@link PeriodicQueryPrunerExecutor} that cleans up 
old periodic bins
    +     */
    +    public PeriodicNotificationApplication(KafkaNotificationProvider 
provider, NotificationCoordinatorExecutor coordinator,
    +            NotificationProcessorExecutor processor, KafkaExporterExecutor 
exporter, PeriodicQueryPrunerExecutor pruner) {
    +        Preconditions.checkNotNull(provider);
    +        Preconditions.checkNotNull(coordinator);
    +        Preconditions.checkNotNull(processor);
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to