[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-08-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-rya/pull/177


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-08-02 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r130935217
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *  Metadata for a given PeriodicQueryStorage table. 
+ */
+public class PeriodicQueryStorageMetadata {
+
+private String sparql;
+private VariableOrder varOrder;
+
+/**
+ * Create a PeriodicQueryStorageMetadat object
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-26 Thread amihalik
Github user amihalik commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r129669294
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *  Metadata for a given PeriodicQueryStorage table. 
+ */
+public class PeriodicQueryStorageMetadata {
+
+private String sparql;
+private VariableOrder varOrder;
+
+/**
+ * Create a PeriodicQueryStorageMetadat object
--- End diff --

typo: ...Metadat**a**


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-26 Thread amihalik
Github user amihalik commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r129670570
  
--- Diff: 
extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPeriodicQueryResultStorageIT.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo.accumulo;
--- End diff --

accumulo.accumulo?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-26 Thread amihalik
Github user amihalik commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r129668480
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+
+/**
+ * Class for creating the names of {@link PeriodicQueryResultStorage} 
tables.
+ *
+ */
+public class PeriodicQueryTableNameFactory {
+
+public static final String PeriodicTableSuffix = "PERIODIC_QUERY_";
--- End diff --

"midfix" :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128063437
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
 ---
@@ -36,10 +36,11 @@
  * Incrementally exports SPARQL query results to Kafka topics.
  */
 public class KafkaBindingSetExporter implements 
IncrementalBindingSetExporter {
+
--- End diff --

I meant the file had no changes in it, so you could revert it and remove it 
from the PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128063325
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
 ---
@@ -0,0 +1,128 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class processes {@link SpanBatchDeleteInformation} objects by
+ * deleting the entries in the Fluo Column corresponding to the {@link 
Span}
+ * of the BatchInformation object.  This class will delete entries until 
the
+ * batch size is met, and then create a new SpanBatchDeleteInformation 
object
+ * with an updated Span whose starting point is the stopping point of this
+ * batch.  If the batch limit is not met, then a new batch is not created 
and
+ * the task is complete.
+ *
+ */
+public class SpanBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
+
+private static final Logger log = 
Logger.getLogger(SpanBatchBindingSetUpdater.class);
+
+/**
+ * Process SpanBatchDeleteInformation objects by deleting all entries 
indicated
+ * by Span until batch limit is met.
+ * @param tx - Fluo Transaction
+ * @param row - Byte row identifying BatchInformation
+ * @param batch - SpanBatchDeleteInformation object to be processed
+ */
+@Override
+public void processBatch(TransactionBase tx, Bytes row, 
BatchInformation batch) throws Exception {
+super.processBatch(tx, row, batch);
+Preconditions.checkArgument(batch instanceof 
SpanBatchDeleteInformation);
+SpanBatchDeleteInformation spanBatch = 
(SpanBatchDeleteInformation) batch;
+Task task = spanBatch.getTask();
+int batchSize = spanBatch.getBatchSize();
+Span span = spanBatch.getSpan();
+Column column = batch.getColumn();
+Optional rowCol = Optional.empty();
+
+switch (task) {
+case Add:
+log.trace("The Task Add is not supported for 
SpanBatchBindingSetUpdater.  Batch " + batch + " will not be processed.");
--- End diff --

fair enough


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128043592
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import jline.internal.Preconditions;
+
+/**
+ * Implementation of {@link BinPruner} that deletes old, already processed
+ * Periodic Query results from Fluo and the PCJ table to which the Fluo 
results
+ * are exported.
+ *
+ */
+public class PeriodicQueryPruner implements BinPruner, Runnable {
+
+private static final Logger log = 
Logger.getLogger(PeriodicQueryPruner.class);
+private FluoClient client;
+private AccumuloBinPruner accPruner;
+private FluoBinPruner fluoPruner;
+private BlockingQueue bins;
+private AtomicBoolean closed = new AtomicBoolean(false);
+private int threadNumber;
+
+public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner 
accPruner, FluoClient client, BlockingQueue bins, int threadNumber) {
+Preconditions.checkNotNull(fluoPruner);
+Preconditions.checkNotNull(accPruner);
+Preconditions.checkNotNull(client);
+this.client = client;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128043280
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.processor;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.api.NotificationProcessor;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import 
org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link NotificationProcessor} that uses the id 
indicated by
+ * the {@link TimestampedNotification} to obtain results from the
+ * {@link PeriodicQueryResultStorage} layer containing the results of the
+ * Periodic Query. The TimestampedNotificationProcessor then parses the 
results
+ * and adds them to work queues to be processed by the {@link BinPruner} 
and the
+ * {@link KafkaPeriodicBindingSetExporter}.
+ *
+ */
+public class TimestampedNotificationProcessor implements 
NotificationProcessor, Runnable {
+
+private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
+private PeriodicQueryResultStorage periodicStorage;
+private BlockingQueue notifications; // 
notifications
+  // to 
process
+private BlockingQueue bins; // entries to delete from Fluo
+private BlockingQueue bindingSets; // query results 
to export
+private AtomicBoolean closed = new AtomicBoolean(false);
+private int threadNumber;
+
+
+public TimestampedNotificationProcessor(PeriodicQueryResultStorage 
periodicStorage,
+BlockingQueue notifications, 
BlockingQueue bins, BlockingQueue bindingSets,
+int threadNumber) {
+Preconditions.checkNotNull(notifications);
+Preconditions.checkNotNull(bins);
+Preconditions.checkNotNull(bindingSets);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128042903
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
 ---
@@ -0,0 +1,117 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.periodic.notification.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Executor service that runs {@link TimestampedNotificationProcessor}s 
with basic
+ * functionality for starting, stopping, and determining whether 
notification processors are
+ * being executed. 
+ *
+ */
+public class NotificationProcessorExecutor implements LifeCycle {
+
+private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
+private BlockingQueue notifications; // 
notifications
+private BlockingQueue bins; // entries to delete from Fluo
+private BlockingQueue bindingSets; // query results 
to
+ // export
+private PeriodicQueryResultStorage periodicStorage;
+private List processors;
+private int numberThreads;
+private ExecutorService executor;
+private boolean running = false;
+
+/**
+ * Creates NotificationProcessorExecutor.
+ * @param periodicStorage - storage layer that periodic results are 
read from
+ * @param notifications - notifications are pulled from this queue, 
and the timestamp indicates which bin of results to query for
+ * @param bins - after notifications are processed, they are added to 
the bin to be deleted
+ * @param bindingSets - results read from the storage layer to be 
exported
+ * @param numberThreads - number of threads used for processing
+ */
+public NotificationProcessorExecutor(PeriodicQueryResultStorage 
periodicStorage, BlockingQueue notifications,
+BlockingQueue bins, BlockingQueue 
bindingSets, int numberThreads) {
+Preconditions.checkNotNull(notifications);
+Preconditions.checkNotNull(bins);
+Preconditions.checkNotNull(bindingSets);
+this.notifications = notifications;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128042579
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform 
workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency 
at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+private String id;
+private long period;
+private TimeUnit periodTimeUnit;
+private long initialDelay;
+
+/**
+ * Creates a PeriodicNotification.
+ * @param id - Fluo Query Id that this notification is associated with
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with the period and 
delay
+ * @param initialDelay - amount of time to wait before generating the 
first notification
+ */
+public PeriodicNotification(String id, long period, TimeUnit 
periodTimeUnit, long initialDelay) {
+Preconditions.checkNotNull(id);
+Preconditions.checkNotNull(periodTimeUnit);
+Preconditions.checkArgument(period > 0 && initialDelay >= 0);
+this.id = id;
+this.period = period;
+this.periodTimeUnit = periodTimeUnit;
+this.initialDelay = initialDelay;
+}
+
+
+/**
+ * Create a PeriodicNotification
+ * @param other - other PeriodicNotification used in copy constructor
+ */
+public PeriodicNotification(PeriodicNotification other) {
+this(other.id, other.period, other.periodTimeUnit, 
other.initialDelay);
+}
+
+public String getId() {
+return id;
+}
+
+/**
+ * @return - period at which regular notifications are generated
+ */
+public long getPeriod() {
+return period;
+}
+
+/**
+ * @return time unit of period and initial delay
+ */
+public TimeUnit getTimeUnit() {
+return periodTimeUnit;
+}
+
+/**
+ * @return amount of time to delay before beginning to generate 
notifications
+ */
+public long getInitialDelay() {
+return initialDelay;
+}
+
+@Override
+public String toString() {
+StringBuilder builder = new StringBuilder();
+String delim = "=";
+String delim2 = ";";
+return 
builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2)
+
.append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim)
+.append(initialDelay).toString();
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+
+if (!(other instanceof PeriodicNotification)) {
+return false;
+}
+
+PeriodicNotification notification = (PeriodicNotification) other;
+return Objects.equals(this.id, notification.id) && (this.period == 
notification.period) 
+&& Objects.equals(this.periodTimeUnit, 
notification.periodTimeUnit) && (this.initialDelay == 
notification.initialDelay);
+

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128042239
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform 
workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency 
at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+private String id;
+private long period;
+private TimeUnit periodTimeUnit;
+private long initialDelay;
+
+/**
+ * Creates a PeriodicNotification.
+ * @param id - Fluo Query Id that this notification is associated with
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with the period and 
delay
+ * @param initialDelay - amount of time to wait before generating the 
first notification
+ */
+public PeriodicNotification(String id, long period, TimeUnit 
periodTimeUnit, long initialDelay) {
+Preconditions.checkNotNull(id);
+Preconditions.checkNotNull(periodTimeUnit);
+Preconditions.checkArgument(period > 0 && initialDelay >= 0);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128041892
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query 
Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or 
deleting).
+ * CommandNotifications are meant to be added to an external work queue 
(such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+private Notification notification;
+private Command command;
+
+public enum Command {
+ADD, DELETE
+};
+
+/**
+ * Creates a new CommandNotification
+ * @param command - the command associated with this notification 
(either add, update, or delete)
+ * @param notification - the underlying notification associated with 
this command
+ */
+public CommandNotification(Command command, Notification notification) 
{
+Preconditions.checkNotNull(notification);
+Preconditions.checkNotNull(command);
+this.command = command;
+this.notification = notification;
+}
+
+@Override
+public String getId() {
+return notification.getId();
+}
+
+/**
+ * Returns {@link Notification} contained by this CommmandNotification.
+ * @return - Notification contained by this Object
+ */
+public Notification getNotification() {
+return this.notification;
+}
+
+/**
+ * @return Command contained by this Object (either add or delete)
+ */
+public Command getCommand() {
+return this.command;
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+if (other instanceof CommandNotification) {
+CommandNotification cn = (CommandNotification) other;
+return Objects.equal(this.command, cn.command) && 
Objects.equal(this.notification, cn.notification);
+} else {
+return false;
+}
+}
+
+@Override
+public int hashCode() {
+int result = 17;
+result = 31 * result + Objects.hashCode(command);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128041556
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query 
Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or 
deleting).
+ * CommandNotifications are meant to be added to an external work queue 
(such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+private Notification notification;
+private Command command;
+
+public enum Command {
+ADD, DELETE
+};
+
+/**
+ * Creates a new CommandNotification
+ * @param command - the command associated with this notification 
(either add, update, or delete)
+ * @param notification - the underlying notification associated with 
this command
+ */
+public CommandNotification(Command command, Notification notification) 
{
+Preconditions.checkNotNull(notification);
+Preconditions.checkNotNull(command);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128041076
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public class BasicNotification implements Notification {
+
+private String id;
+
+/**
+ * Creates a BasicNotification
+ * @param id - Fluo query id associated with this Notification
+ */
+public BasicNotification(String id) {
+this.id = id;
+}
+
+/**
+ * @return the Fluo Query Id that this notification will generate 
results for
+ */
+@Override
+public String getId() {
+return id;
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+
+if (other instanceof BasicNotification) {
+BasicNotification not = (BasicNotification) other;
+return Objects.equal(this.id, not.id);
+}
+
+return false;
+}
+
+@Override
+public int hashCode() {
+int result = 17;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128001523
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
+import 
org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
+import 
org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
+import 
org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The PeriodicNotificationApplication runs the key components of the 
Periodic
+ * Query Service. It consists of a {@link KafkaNotificationProvider}, a
+ * {@link NotificationCoordinatorExecutor}, a
+ * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, 
and a
+ * {@link PeriodicQueryPrunerExecutor}. These services run in coordination 
with
+ * one another to perform the following tasks in the indicated order: 
+ * Retrieve new requests to generate periodic notifications from Kafka
+ * Register them with the {@link NotificationCoordinatorExecutor} to
+ * generate the periodic notifications
+ * As notifications are generated, they are added to a work queue that 
is
+ * monitored by the {@link NotificationProcessorExecutor}.
+ * The processor processes the notifications by reading all of the 
query
+ * results corresponding to the bin and query id indicated by the 
notification.
+ * After reading the results, the processor adds a {@link 
BindingSetRecord}
+ * to a work queue monitored by the {@link KafkaExporterExecutor}.
+ * The processor then adds a {@link NodeBin} to a workqueue monitored 
by the
+ * {@link BinPruner}
+ * The exporter processes the BindingSetRecord by exporing the result 
to
+ * Kafka
+ * The BinPruner processes the NodeBin by cleaning up the results for 
the
+ * indicated bin and query in Accumulo and Fluo. 
+ * 
+ * The purpose of this Periodic Query Service is to facilitate the ability 
to
+ * answer Periodic Queries using the Rya Fluo application, where a Periodic
+ * Query is any query requesting periodic updates about events that 
occurred
+ * within a given window of time of this instant. This is also known as a
+ * rolling window query. Period Queries can be expressed using SPARQL by
+ * including the {@link Function} indicated by the URI
+ * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this
+ * Function with the following arguments: the temporal variable in the 
query
+ * that will be filtered on, the window of time that events must occur 
within,
+ * the period at which the user wants to receive updates, and the time 
unit. The
+ * following query requests all observations that occurred within the last
+ * minute and requests updates every 15 seconds. It also performs a count 
on
+ * those observations. 
+ * 
+ * prefix function: http://org.apache.rya/function#
+ * "prefix time: http://www.w3.org/2006/time#
+ * "select (count(?obs) as ?total) where {
+ * "Filter(function:periodic(?time, 1, .25, time:minutes))
+ * "?obs uri:hasTime ?time.
+ * "?obs u

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128001253
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import 
org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class BatchInformationSerializerTest {
+
+@Test
+public void testSpanBatchInformationSerialization() {
+
+SpanBatchDeleteInformation batch = 
SpanBatchDeleteInformation.builder().setBatchSize(1000)
+
.setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix"))).build();
+System.out.println(batch);
+byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
+Optional decodedBatch = 
BatchInformationSerializer.fromBytes(batchBytes);
+System.out.println(decodedBatch);
+assertEquals(batch, decodedBatch.get());
+}
+
+@Test
+public void testJoinBatchInformationSerialization() {
+
+QueryBindingSet bs = new QueryBindingSet();
+bs.addBinding("a", new URIImpl("urn:123"));
+bs.addBinding("b", new URIImpl("urn:456"));
+VisibilityBindingSet vBis = new VisibilityBindingSet(bs, "FOUO");
+
+JoinBatchInformation batch = 
JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update)
+
.setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346")))
+
.setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new 
VariableOrder(Arrays.asList("a", "b")))
+.setBs(vBis).build();
+
+System.out.println(batch);
--- End diff --

deleted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128000960
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.openrdf.query.algebra.QueryModelVisitor;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * This is a {@link UnaryTupleOperator} that gets placed in the parsed 
query
+ * {@link TupleExpr} when a {@link Filter} is encountered in the SPARQL 
String that
+ * contains the Periodic {@link Function} {@link 
PeriodicQueryUtil#PeriodicQueryURI}.
+ * The PeiodicQueryNode is created from the arguments passed to the 
Periodic Function,
+ * which consist of a time unit, a temporal period, a temporal window of 
time, and the
+ * temporal variable in the query, which assumes a value indicated by the
+ * Time ontology: http://www.w3.org/2006/time. The purpose of the 
PeriodicQueryNode
+ * is to filter out all events that did not occur within the specified 
window of time
+ * of this instant and to generate notifications at a regular interval 
indicated by the period.
+ *
+ */
+public class PeriodicQueryNode extends UnaryTupleOperator {
+
+private TimeUnit unit;
+private long windowDuration;
+private long periodDuration;
+private String temporalVar;
+
+/**
+ * Creates a PeriodicQueryNode from the specified values.
+ * @param window - specifies the window of time that event must occur 
within from this instant
+ * @param period - regular interval at which notifications are 
generated (must be leq window).
+ * @param unit - time unit of the period and window
+ * @param temporalVar - temporal variable in query used for filtering
+ * @param arg - child of PeriodicQueryNode in parsed query
+ */
+public PeriodicQueryNode(long window, long period, TimeUnit unit, 
String temporalVar, TupleExpr arg) {
+super(arg);
+checkArgument(period <= window);
+checkNotNull(temporalVar);
+checkNotNull(arg);
+checkNotNull(unit);
+this.windowDuration = window;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r128000243
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
 ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * Metadata that is required for periodic queries in the Rya Fluo 
Application.  
+ * If a periodic query is registered with the Rya Fluo application, the 
BindingSets
+ * are placed into temporal bins according to whether they occur within 
the window of
+ * a period's ending time.  This Metadata is used to create a Bin Id, 
which is equivalent
+ * to the period's ending time, to be inserted into each BindingSet that 
occurs within that
+ * bin.  This is to allow the AggregationUpdater to aggregate the bins by 
grouping on the 
+ * Bin Id.
+ * 
+ */
+public class PeriodicQueryMetadata extends CommonNodeMetadata {
+
+private String parentNodeId;
+private String childNodeId;
+private long windowSize;
+private long period;
+private TimeUnit unit;
+private String temporalVariable;
+
+/**
+ * Constructs an instance of PeriodicQueryMetadata
+ * @param nodeId - id of periodic query node
+ * @param varOrder - variable order indicating the order the 
BindingSet results are written in
+ * @param parentNodeId - id of parent node
+ * @param childNodeId - id of child node
+ * @param windowSize - size of window used for filtering
+ * @param period - period size that indicates frequency of 
notifications
+ * @param unit - TimeUnit corresponding to window and period
+ * @param temporalVariable - temporal variable that periodic 
conditions are applied to
+ */
+public PeriodicQueryMetadata(String nodeId, VariableOrder varOrder, 
String parentNodeId, String childNodeId, long windowSize, long period,
+TimeUnit unit, String temporalVariable) {
+super(nodeId, varOrder);
+Preconditions.checkNotNull(parentNodeId);
+Preconditions.checkNotNull(childNodeId);
+Preconditions.checkNotNull(temporalVariable);
+Preconditions.checkNotNull(unit);
+Preconditions.checkNotNull(period > 0);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127999634
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127997700
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
 ---
@@ -36,10 +36,11 @@
  * Incrementally exports SPARQL query results to Kafka topics.
  */
 public class KafkaBindingSetExporter implements 
IncrementalBindingSetExporter {
+
--- End diff --

Okay.  Well that's good.  Can't tell if you're being sarcastic or 
something.  No need to leave a comment if nothing needs to be changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127997246
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
 ---
@@ -0,0 +1,92 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * This class represents a batch order to delete all entries in the Fluo 
table indicated
+ * by the given Span and Column.  These batch orders are processed by the 
{@link BatchObserver},
+ * which uses this batch information along with the nodeId passed into the 
Observer to perform
+ * batch deletes.  
+ *
+ */
+public class SpanBatchDeleteInformation extends 
AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
SpanBatchBindingSetUpdater();
+
+public SpanBatchDeleteInformation(int batchSize, Column column, Span 
span) {
+super(batchSize, Task.Delete, column, span);
+}
+
+/**
+ * @return Updater that applies the {@link Task} to the given {@link 
Span} and {@link Column}
+ */
+@Override
+public BatchBindingSetUpdater getBatchUpdater() {
+return updater;
+}
+
+
+public static Builder builder() {
+return new Builder();
+}
+
+public static class Builder {
+
+private int batchSize = DEFAULT_BATCH_SIZE;
+private Column column;
+private Span span;
+
+/**
+ * @param batchSize - {@link Task}s are applied in batches of this 
size
+ */
+public Builder setBatchSize(int batchSize) {
+this.batchSize = batchSize;
+return this;
+}
+
+/**
+ * Sets column to apply batch {@link Task} to
+ * @param column - column batch Task will be applied to
+ * @return
+ */
+public Builder setColumn(Column column) {
+this.column = column;
+return this;
+}
+
+/**
+ * @param span - span that batch {@link Task} will be applied to
+ *
+ */
+public Builder setSpan(Span span) {
+this.span = span;
+return this;
+}
+
+
+public SpanBatchDeleteInformation build() {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127997010
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
 ---
@@ -0,0 +1,92 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * This class represents a batch order to delete all entries in the Fluo 
table indicated
+ * by the given Span and Column.  These batch orders are processed by the 
{@link BatchObserver},
+ * which uses this batch information along with the nodeId passed into the 
Observer to perform
+ * batch deletes.  
+ *
+ */
+public class SpanBatchDeleteInformation extends 
AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
SpanBatchBindingSetUpdater();
+
+public SpanBatchDeleteInformation(int batchSize, Column column, Span 
span) {
+super(batchSize, Task.Delete, column, span);
+}
+
+/**
+ * @return Updater that applies the {@link Task} to the given {@link 
Span} and {@link Column}
+ */
+@Override
+public BatchBindingSetUpdater getBatchUpdater() {
+return updater;
+}
+
+
+public static Builder builder() {
--- End diff --

I think this one is pretty clear.  Not providing class docs here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127996635
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
 ---
@@ -0,0 +1,128 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class processes {@link SpanBatchDeleteInformation} objects by
+ * deleting the entries in the Fluo Column corresponding to the {@link 
Span}
+ * of the BatchInformation object.  This class will delete entries until 
the
+ * batch size is met, and then create a new SpanBatchDeleteInformation 
object
+ * with an updated Span whose starting point is the stopping point of this
+ * batch.  If the batch limit is not met, then a new batch is not created 
and
+ * the task is complete.
+ *
+ */
+public class SpanBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
+
+private static final Logger log = 
Logger.getLogger(SpanBatchBindingSetUpdater.class);
+
+/**
+ * Process SpanBatchDeleteInformation objects by deleting all entries 
indicated
+ * by Span until batch limit is met.
+ * @param tx - Fluo Transaction
+ * @param row - Byte row identifying BatchInformation
+ * @param batch - SpanBatchDeleteInformation object to be processed
+ */
+@Override
+public void processBatch(TransactionBase tx, Bytes row, 
BatchInformation batch) throws Exception {
+super.processBatch(tx, row, batch);
+Preconditions.checkArgument(batch instanceof 
SpanBatchDeleteInformation);
+SpanBatchDeleteInformation spanBatch = 
(SpanBatchDeleteInformation) batch;
+Task task = spanBatch.getTask();
+int batchSize = spanBatch.getBatchSize();
+Span span = spanBatch.getSpan();
+Column column = batch.getColumn();
+Optional rowCol = Optional.empty();
+
+switch (task) {
+case Add:
+log.trace("The Task Add is not supported for 
SpanBatchBindingSetUpdater.  Batch " + batch + " will not be processed.");
--- End diff --

I don't think you want to kill the whole application here.  If an invalid 
batch request is made, the application simply logs that it can't process it, 
then moves on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127996045
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the 
join's
+ * children. The join has two children, and for one child a 
VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet 
represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node 
that
+ * wasn't updated) can be scanned to obtain results that can be joined 
with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived 
from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This 
class
+ * represents a batch order to perform a given task on join BindingSet 
results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is 
processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task 
is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's 
child for
+ * results using the indicated Span and Column. These results are joined 
with
+ * the indicated VisibilityBindingSet, and the results are added to the 
parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
JoinBatchBindingSetUpdater();
+private VisibilityBindingSet bs; //update for join child indicated by 
side
+private VariableOrder varOrder; //variable order for child indicated 
by Span
+private Side side;  //join child that was updated by bs
+private JoinType join;
+/**
+ * @param batchSize - batch size that Tasks are performed in
+ * @param task - Add, Delete, or Update
+ * @param column - Column of join child to be scanned
+ * @param span - span of join child to be scanned (derived from common 
variables of left and right join children)
+ * @param bs - BindingSet to be joined with results of child scan
+ * @param varOrder - VariableOrder used to form join (order for join 
child corresponding to Span)
+ * @param side - The side of the child that the VisibilityBindingSet 
update occurred at
+ * @param join - JoinType (left, right, natural inner)
+ */
+public JoinBatchInformation(int batchSize, Task task, Column column, 
Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType 
join) {
+super(batchSize, task, column, span);
+Preconditions.checkNotNull(bs);
+Preconditions.checkNotNull(varOrder);
+Preconditions.checkNotNull(side);
+Preconditions.checkNotNull(join);
+this.bs = bs;
+this.varOrder = varOrder;
+this.join = join;
+this.side = side;
+}
+
+public JoinBatchInformation(Task task, Colum

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127995755
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the 
join's
+ * children. The join has two children, and for one child a 
VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet 
represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node 
that
+ * wasn't updated) can be scanned to obtain results that can be joined 
with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived 
from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This 
class
+ * represents a batch order to perform a given task on join BindingSet 
results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is 
processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task 
is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's 
child for
+ * results using the indicated Span and Column. These results are joined 
with
+ * the indicated VisibilityBindingSet, and the results are added to the 
parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
JoinBatchBindingSetUpdater();
+private VisibilityBindingSet bs; //update for join child indicated by 
side
+private VariableOrder varOrder; //variable order for child indicated 
by Span
+private Side side;  //join child that was updated by bs
+private JoinType join;
+/**
+ * @param batchSize - batch size that Tasks are performed in
+ * @param task - Add, Delete, or Update
+ * @param column - Column of join child to be scanned
+ * @param span - span of join child to be scanned (derived from common 
variables of left and right join children)
+ * @param bs - BindingSet to be joined with results of child scan
+ * @param varOrder - VariableOrder used to form join (order for join 
child corresponding to Span)
+ * @param side - The side of the child that the VisibilityBindingSet 
update occurred at
+ * @param join - JoinType (left, right, natural inner)
+ */
+public JoinBatchInformation(int batchSize, Task task, Column column, 
Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType 
join) {
+super(batchSize, task, column, span);
+Preconditions.checkNotNull(bs);
+Preconditions.checkNotNull(varOrder);
+Preconditions.checkNotNull(side);
+Preconditions.checkNotNull(join);
+this.bs = bs;
+this.varOrder = varOrder;
+this.join = join;
+this.side = side;
+}
+
+public JoinBatchInformation(Task task, Colum

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127995235
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the 
join's
+ * children. The join has two children, and for one child a 
VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet 
represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node 
that
+ * wasn't updated) can be scanned to obtain results that can be joined 
with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived 
from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This 
class
+ * represents a batch order to perform a given task on join BindingSet 
results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is 
processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task 
is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's 
child for
+ * results using the indicated Span and Column. These results are joined 
with
+ * the indicated VisibilityBindingSet, and the results are added to the 
parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
JoinBatchBindingSetUpdater();
+private VisibilityBindingSet bs; //update for join child indicated by 
side
+private VariableOrder varOrder; //variable order for child indicated 
by Span
+private Side side;  //join child that was updated by bs
+private JoinType join;
+/**
+ * @param batchSize - batch size that Tasks are performed in
+ * @param task - Add, Delete, or Update
+ * @param column - Column of join child to be scanned
+ * @param span - span of join child to be scanned (derived from common 
variables of left and right join children)
+ * @param bs - BindingSet to be joined with results of child scan
+ * @param varOrder - VariableOrder used to form join (order for join 
child corresponding to Span)
+ * @param side - The side of the child that the VisibilityBindingSet 
update occurred at
+ * @param join - JoinType (left, right, natural inner)
+ */
+public JoinBatchInformation(int batchSize, Task task, Column column, 
Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType 
join) {
+super(batchSize, task, column, span);
+Preconditions.checkNotNull(bs);
+Preconditions.checkNotNull(varOrder);
+Preconditions.checkNotNull(side);
+Preconditions.checkNotNull(join);
+this.bs = bs;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not ha

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127995043
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java
 ---
@@ -0,0 +1,82 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class contains all of the common info contained in other 
implementations
+ * of BatchInformation.
+ *
+ */
+public abstract class BasicBatchInformation implements BatchInformation {
+
+private int batchSize;
+private Task task;
+private Column column;
+
+/**
+ * Create BasicBatchInformation object
+ * @param batchSize - size of batch to be processed
+ * @param task - task to be processed
+ * @param column - Column in which data is proessed
+ */
+public BasicBatchInformation(int batchSize, Task task, Column column ) 
{
+Preconditions.checkNotNull(task);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127994577
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+import jline.internal.Preconditions;
+
+/**
+ * Abstract class for generating span based notifications.  A spanned 
notification
+ * uses a {@link Span} to begin processing a Fluo Column at the position 
designated by the Span.
+ *
+ */
+public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation {
+
+private Span span;
+
+/**
+ * Create AbstractBatchInformation
+ * @param batchSize - size of batch to be processed
+ * @param task - type of task processed (Add, Delete, Udpate)
+ * @param column - Cpolumn that Span notification is applied
+ * @param span - span used to indicate where processing should begin
+ */
+public AbstractSpanBatchInformation(int batchSize, Task task, Column 
column, Span span) {
+super(batchSize, task, column);
+Preconditions.checkNotNull(span);
+this.span = span;
+}
+
+public AbstractSpanBatchInformation(Task task, Column column, Span 
span) {
+this(DEFAULT_BATCH_SIZE, task, column, span);
+}
+
+/**
+ * @return Span that batch Task will be applied to
+ */
+public Span getSpan() {
+return span;
+}
+
+/**
+ * Sets span to which batch Task will be applied
+ * @param span
+ */
+public void setSpan(Span span) {
+this.span = span;
+}
+
+@Override
+public String toString() {
+return new StringBuilder()
+.append("Span Batch Information {\n")
+.append("Span: " + span + "\n")
+.append("Batch Size: " + super.getBatchSize() + "\n")
+.append("Task: " + super.getTask() + "\n")
+.append("Column: " + super.getColumn() + "\n")
+.append("}")
+.toString();
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+
+if (!(other instanceof AbstractSpanBatchInformation)) {
+return false;
+}
+
+AbstractSpanBatchInformation batch = 
(AbstractSpanBatchInformation) other;
+return (super.getBatchSize() == batch.getBatchSize()) && 
Objects.equals(super.getColumn(), batch.getColumn()) && 
Objects.equals(this.span, batch.span)
+&& Objects.equals(super.getTask(), batch.getTask());
+}
+
+@Override
+public int hashCode() {
+int result = 17;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127993938
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+import jline.internal.Preconditions;
+
+/**
+ * Abstract class for generating span based notifications.  A spanned 
notification
+ * uses a {@link Span} to begin processing a Fluo Column at the position 
designated by the Span.
+ *
+ */
+public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation {
+
+private Span span;
+
+/**
+ * Create AbstractBatchInformation
+ * @param batchSize - size of batch to be processed
+ * @param task - type of task processed (Add, Delete, Udpate)
+ * @param column - Cpolumn that Span notification is applied
+ * @param span - span used to indicate where processing should begin
+ */
+public AbstractSpanBatchInformation(int batchSize, Task task, Column 
column, Span span) {
+super(batchSize, task, column);
+Preconditions.checkNotNull(span);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127993723
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+public abstract class AbstractBatchBindingSetUpdater implements 
BatchBindingSetUpdater {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127993037
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 ---
@@ -43,6 +43,7 @@
 import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
--- End diff --

yep


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127992511
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
 ---
@@ -67,7 +68,8 @@
 /**
  * Constructs an instance of {@link DeletePcj}.
  *
- * @param batchSize - The number of entries that will be deleted at a 
time. (> 0)
+ * @param batchSize
+ *- The number of entries that will be deleted at a time. 
(> 0)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127990899
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.fluo.api.data.Bytes;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Serializes and deserializes a {@link VisibilityBindingSet} to and from 
{@link Bytes} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public class VisibilityBindingSetSerDe {
--- End diff --

This is actually a duplicate class.  I think I needed to migrate the 
original to this project, but forgot to delete the original. Deleted the 
original.  Not my class, so I didn't decide on the naming conventions here.  
Leaving as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127988392
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+
+/**
+ * Class for creating the names of {@link PeriodicQueryResultStorage} 
tables.
+ *
+ */
+public class PeriodicQueryTableNameFactory {
+
+public static final String PeriodicTableSuffix = "PERIODIC_QUERY_";
--- End diff --

It actually is a suffix. This follows the rya prefix in the table name.  A 
UUID is appended after this in the table name, which is why the underscore is 
added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127987608
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
 ---
@@ -412,9 +413,9 @@ private void writeResults(
 // Row ID = binding set values, Column Family = variable 
order of the binding set.
 final Mutation addResult = new Mutation(rowKey);
 final String visibility = result.getVisibility();
-addResult.put(varOrder.toString(), "", new 
ColumnVisibility(visibility), "");
+addResult.put(varOrder.toString(), "", new 
ColumnVisibility(visibility), new Value(bsSerDe.serialize(result).toArray()));
 mutations.add(addResult);
-} catch(final BindingSetConversionException e) {
+} catch(Exception e) {
--- End diff --

Just converted back to the original Exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127986867
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
+this.ryaInstance = ryaInstance;
+String user = accumuloConn.whoami();
+try {
+this.auths = 
accumuloConn.securityOperations().getUserAuthorizations(user);
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new RuntimeException("Unable access user: " + user + 
"authorizations.");
+}
+}
+
+@Override
+public String createPeriodicQuery(String sparql) throws 
PeriodicQueryStorageException {
+Preconditions.checkNotNull(sparql);
+String queryId = pcjIdFactory.nextId();
+r

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127986607
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
+this.ryaInstance = ryaInstance;
+String user = accumuloConn.whoami();
+try {
+this.auths = 
accumuloConn.securityOperations().getUserAuthorizations(user);
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new RuntimeException("Unable access user: " + user + 
"authorizations.");
+}
+}
+
+@Override
+public String createPeriodicQuery(String sparql) throws 
PeriodicQueryStorageException {
+Preconditions.checkNotNull(sparql);
+String queryId = pcjIdFactory.nextId();
+r

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127984352
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
+this.ryaInstance = ryaInstance;
+String user = accumuloConn.whoami();
+try {
+this.auths = 
accumuloConn.securityOperations().getUserAuthorizations(user);
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new RuntimeException("Unable access user: " + user + 
"authorizations.");
+}
+}
+
+@Override
+public String createPeriodicQuery(String sparql) throws 
PeriodicQueryStorageException {
+Preconditions.checkNotNull(sparql);
+String queryId = pcjIdFactory.nextId();
+r

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127981764
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127981484
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *  Metadata for a given PeriodicQueryStorage table. 
+ */
+public class PeriodicQueryStorageMetadata {
+
+private String sparql;
+private VariableOrder varOrder;
+
+/**
+ * Create a PeriodicQueryStorageMetadat object
+ * @param sparql - SPARQL query whose results are stored in table
+ * @param varOrder - order that BindingSet values are written in in 
table
+ */
+public PeriodicQueryStorageMetadata(String sparql, VariableOrder 
varOrder) {
+Preconditions.checkNotNull(sparql);
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-18 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127981314
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+public class PeriodicQueryStorageException extends Exception {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127736780
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
+this.ryaInstance = ryaInstance;
+String user = accumuloConn.whoami();
+try {
+this.auths = 
accumuloConn.securityOperations().getUserAuthorizations(user);
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new RuntimeException("Unable access user: " + user + 
"authorizations.");
+}
+}
+
+@Override
+public String createPeriodicQuery(String sparql) throws 
PeriodicQueryStorageException {
+Preconditions.checkNotNull(sparql);
+String queryId = pcjIdFactory.nextId();
+ret

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127734223
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
+this.ryaInstance = ryaInstance;
+String user = accumuloConn.whoami();
+try {
+this.auths = 
accumuloConn.securityOperations().getUserAuthorizations(user);
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new RuntimeException("Unable access user: " + user + 
"authorizations.");
+}
+}
+
+@Override
+public String createPeriodicQuery(String sparql) throws 
PeriodicQueryStorageException {
+Preconditions.checkNotNull(sparql);
+String queryId = pcjIdFactory.nextId();
+ret

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127732860
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
--- End diff --

precondition checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127799075
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.processor;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.api.NotificationProcessor;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import 
org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link NotificationProcessor} that uses the id 
indicated by
+ * the {@link TimestampedNotification} to obtain results from the
+ * {@link PeriodicQueryResultStorage} layer containing the results of the
+ * Periodic Query. The TimestampedNotificationProcessor then parses the 
results
+ * and adds them to work queues to be processed by the {@link BinPruner} 
and the
+ * {@link KafkaPeriodicBindingSetExporter}.
+ *
+ */
+public class TimestampedNotificationProcessor implements 
NotificationProcessor, Runnable {
+
+private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
+private PeriodicQueryResultStorage periodicStorage;
+private BlockingQueue notifications; // 
notifications
+  // to 
process
+private BlockingQueue bins; // entries to delete from Fluo
+private BlockingQueue bindingSets; // query results 
to export
+private AtomicBoolean closed = new AtomicBoolean(false);
+private int threadNumber;
+
+
+public TimestampedNotificationProcessor(PeriodicQueryResultStorage 
periodicStorage,
+BlockingQueue notifications, 
BlockingQueue bins, BlockingQueue bindingSets,
+int threadNumber) {
+Preconditions.checkNotNull(notifications);
+Preconditions.checkNotNull(bins);
+Preconditions.checkNotNull(bindingSets);
--- End diff --

here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127754845
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+import jline.internal.Preconditions;
+
+/**
+ * Abstract class for generating span based notifications.  A spanned 
notification
+ * uses a {@link Span} to begin processing a Fluo Column at the position 
designated by the Span.
+ *
+ */
+public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation {
+
+private Span span;
+
+/**
+ * Create AbstractBatchInformation
+ * @param batchSize - size of batch to be processed
+ * @param task - type of task processed (Add, Delete, Udpate)
+ * @param column - Cpolumn that Span notification is applied
+ * @param span - span used to indicate where processing should begin
+ */
+public AbstractSpanBatchInformation(int batchSize, Task task, Column 
column, Span span) {
+super(batchSize, task, column);
+Preconditions.checkNotNull(span);
+this.span = span;
+}
+
+public AbstractSpanBatchInformation(Task task, Column column, Span 
span) {
+this(DEFAULT_BATCH_SIZE, task, column, span);
+}
+
+/**
+ * @return Span that batch Task will be applied to
+ */
+public Span getSpan() {
+return span;
+}
+
+/**
+ * Sets span to which batch Task will be applied
+ * @param span
+ */
+public void setSpan(Span span) {
+this.span = span;
+}
+
+@Override
+public String toString() {
+return new StringBuilder()
+.append("Span Batch Information {\n")
+.append("Span: " + span + "\n")
+.append("Batch Size: " + super.getBatchSize() + "\n")
+.append("Task: " + super.getTask() + "\n")
+.append("Column: " + super.getColumn() + "\n")
+.append("}")
+.toString();
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+
+if (!(other instanceof AbstractSpanBatchInformation)) {
+return false;
+}
+
+AbstractSpanBatchInformation batch = 
(AbstractSpanBatchInformation) other;
+return (super.getBatchSize() == batch.getBatchSize()) && 
Objects.equals(super.getColumn(), batch.getColumn()) && 
Objects.equals(this.span, batch.span)
+&& Objects.equals(super.getTask(), batch.getTask());
+}
+
+@Override
+public int hashCode() {
+int result = 17;
--- End diff --

use Objects.hash()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127798539
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform 
workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency 
at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+private String id;
+private long period;
+private TimeUnit periodTimeUnit;
+private long initialDelay;
+
+/**
+ * Creates a PeriodicNotification.
+ * @param id - Fluo Query Id that this notification is associated with
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with the period and 
delay
+ * @param initialDelay - amount of time to wait before generating the 
first notification
+ */
+public PeriodicNotification(String id, long period, TimeUnit 
periodTimeUnit, long initialDelay) {
+Preconditions.checkNotNull(id);
+Preconditions.checkNotNull(periodTimeUnit);
+Preconditions.checkArgument(period > 0 && initialDelay >= 0);
--- End diff --

here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127754539
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+import jline.internal.Preconditions;
+
+/**
+ * Abstract class for generating span based notifications.  A spanned 
notification
+ * uses a {@link Span} to begin processing a Fluo Column at the position 
designated by the Span.
+ *
+ */
+public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation {
+
+private Span span;
+
+/**
+ * Create AbstractBatchInformation
+ * @param batchSize - size of batch to be processed
+ * @param task - type of task processed (Add, Delete, Udpate)
+ * @param column - Cpolumn that Span notification is applied
+ * @param span - span used to indicate where processing should begin
+ */
+public AbstractSpanBatchInformation(int batchSize, Task task, Column 
column, Span span) {
+super(batchSize, task, column);
+Preconditions.checkNotNull(span);
--- End diff --

this.span = checkNotNull(span)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127769861
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the 
join's
+ * children. The join has two children, and for one child a 
VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet 
represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node 
that
+ * wasn't updated) can be scanned to obtain results that can be joined 
with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived 
from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This 
class
+ * represents a batch order to perform a given task on join BindingSet 
results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is 
processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task 
is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's 
child for
+ * results using the indicated Span and Column. These results are joined 
with
+ * the indicated VisibilityBindingSet, and the results are added to the 
parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
JoinBatchBindingSetUpdater();
+private VisibilityBindingSet bs; //update for join child indicated by 
side
+private VariableOrder varOrder; //variable order for child indicated 
by Span
+private Side side;  //join child that was updated by bs
+private JoinType join;
+/**
+ * @param batchSize - batch size that Tasks are performed in
+ * @param task - Add, Delete, or Update
+ * @param column - Column of join child to be scanned
+ * @param span - span of join child to be scanned (derived from common 
variables of left and right join children)
+ * @param bs - BindingSet to be joined with results of child scan
+ * @param varOrder - VariableOrder used to form join (order for join 
child corresponding to Span)
+ * @param side - The side of the child that the VisibilityBindingSet 
update occurred at
+ * @param join - JoinType (left, right, natural inner)
+ */
+public JoinBatchInformation(int batchSize, Task task, Column column, 
Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType 
join) {
+super(batchSize, task, column, span);
+Preconditions.checkNotNull(bs);
+Preconditions.checkNotNull(varOrder);
+Preconditions.checkNotNull(side);
+Preconditions.checkNotNull(join);
+this.bs = bs;
+this.varOrder = varOrder;
+this.join = join;
+this.side = side;
+}
+
+public JoinBatchInformation(Task task, Column 

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127798013
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+
+/**
+ * Notification Object used by the Periodic Query Service
+ * to inform workers to process results for a given Periodic
+ * Query with the indicated id.
+ *
+ */
+public class BasicNotification implements Notification {
+
+private String id;
+
+/**
+ * Creates a BasicNotification
+ * @param id - Fluo query id associated with this Notification
+ */
+public BasicNotification(String id) {
+this.id = id;
+}
+
+/**
+ * @return the Fluo Query Id that this notification will generate 
results for
+ */
+@Override
+public String getId() {
+return id;
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+
+if (other instanceof BasicNotification) {
+BasicNotification not = (BasicNotification) other;
+return Objects.equal(this.id, not.id);
+}
+
+return false;
+}
+
+@Override
+public int hashCode() {
+int result = 17;
--- End diff --

objects.hash()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127769759
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the 
join's
+ * children. The join has two children, and for one child a 
VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet 
represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node 
that
+ * wasn't updated) can be scanned to obtain results that can be joined 
with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived 
from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This 
class
+ * represents a batch order to perform a given task on join BindingSet 
results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is 
processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task 
is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's 
child for
+ * results using the indicated Span and Column. These results are joined 
with
+ * the indicated VisibilityBindingSet, and the results are added to the 
parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
JoinBatchBindingSetUpdater();
+private VisibilityBindingSet bs; //update for join child indicated by 
side
+private VariableOrder varOrder; //variable order for child indicated 
by Span
+private Side side;  //join child that was updated by bs
+private JoinType join;
+/**
+ * @param batchSize - batch size that Tasks are performed in
+ * @param task - Add, Delete, or Update
+ * @param column - Column of join child to be scanned
+ * @param span - span of join child to be scanned (derived from common 
variables of left and right join children)
+ * @param bs - BindingSet to be joined with results of child scan
+ * @param varOrder - VariableOrder used to form join (order for join 
child corresponding to Span)
+ * @param side - The side of the child that the VisibilityBindingSet 
update occurred at
+ * @param join - JoinType (left, right, natural inner)
+ */
+public JoinBatchInformation(int batchSize, Task task, Column column, 
Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType 
join) {
+super(batchSize, task, column, span);
+Preconditions.checkNotNull(bs);
+Preconditions.checkNotNull(varOrder);
+Preconditions.checkNotNull(side);
+Preconditions.checkNotNull(join);
+this.bs = bs;
--- End diff --

same stuff here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project do

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127792656
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.openrdf.query.algebra.QueryModelVisitor;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * This is a {@link UnaryTupleOperator} that gets placed in the parsed 
query
+ * {@link TupleExpr} when a {@link Filter} is encountered in the SPARQL 
String that
+ * contains the Periodic {@link Function} {@link 
PeriodicQueryUtil#PeriodicQueryURI}.
+ * The PeiodicQueryNode is created from the arguments passed to the 
Periodic Function,
+ * which consist of a time unit, a temporal period, a temporal window of 
time, and the
+ * temporal variable in the query, which assumes a value indicated by the
+ * Time ontology: http://www.w3.org/2006/time. The purpose of the 
PeriodicQueryNode
+ * is to filter out all events that did not occur within the specified 
window of time
+ * of this instant and to generate notifications at a regular interval 
indicated by the period.
+ *
+ */
+public class PeriodicQueryNode extends UnaryTupleOperator {
+
+private TimeUnit unit;
+private long windowDuration;
+private long periodDuration;
+private String temporalVar;
+
+/**
+ * Creates a PeriodicQueryNode from the specified values.
+ * @param window - specifies the window of time that event must occur 
within from this instant
+ * @param period - regular interval at which notifications are 
generated (must be leq window).
+ * @param unit - time unit of the period and window
+ * @param temporalVar - temporal variable in query used for filtering
+ * @param arg - child of PeriodicQueryNode in parsed query
+ */
+public PeriodicQueryNode(long window, long period, TimeUnit unit, 
String temporalVar, TupleExpr arg) {
+super(arg);
+checkArgument(period <= window);
+checkNotNull(temporalVar);
+checkNotNull(arg);
+checkNotNull(unit);
+this.windowDuration = window;
--- End diff --

= checkNotNull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127794413
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch.serializer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation;
+import 
org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class BatchInformationSerializerTest {
+
+@Test
+public void testSpanBatchInformationSerialization() {
+
+SpanBatchDeleteInformation batch = 
SpanBatchDeleteInformation.builder().setBatchSize(1000)
+
.setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix"))).build();
+System.out.println(batch);
+byte[] batchBytes = BatchInformationSerializer.toBytes(batch);
+Optional decodedBatch = 
BatchInformationSerializer.fromBytes(batchBytes);
+System.out.println(decodedBatch);
+assertEquals(batch, decodedBatch.get());
+}
+
+@Test
+public void testJoinBatchInformationSerialization() {
+
+QueryBindingSet bs = new QueryBindingSet();
+bs.addBinding("a", new URIImpl("urn:123"));
+bs.addBinding("b", new URIImpl("urn:456"));
+VisibilityBindingSet vBis = new VisibilityBindingSet(bs, "FOUO");
+
+JoinBatchInformation batch = 
JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update)
+
.setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346")))
+
.setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new 
VariableOrder(Arrays.asList("a", "b")))
+.setBs(vBis).build();
+
+System.out.println(batch);
--- End diff --

System.println()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127798259
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query 
Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or 
deleting).
+ * CommandNotifications are meant to be added to an external work queue 
(such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+private Notification notification;
+private Command command;
+
+public enum Command {
+ADD, DELETE
+};
+
+/**
+ * Creates a new CommandNotification
+ * @param command - the command associated with this notification 
(either add, update, or delete)
+ * @param notification - the underlying notification associated with 
this command
+ */
+public CommandNotification(Command command, Notification notification) 
{
+Preconditions.checkNotNull(notification);
+Preconditions.checkNotNull(command);
--- End diff --

here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127769914
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -0,0 +1,262 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Objects;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.Binding;
+
+import jline.internal.Preconditions;
+
+/**
+ * This class updates join results based on parameters specified for the 
join's
+ * children. The join has two children, and for one child a 
VisibilityBindingSet
+ * is specified along with the Side of that child. This BindingSet 
represents an
+ * update to that join child. For the other child, a Span, Column and
+ * VariableOrder are specified. This is so that the sibling node (the node 
that
+ * wasn't updated) can be scanned to obtain results that can be joined 
with the
+ * VisibilityBindingSet. The assumption here is that the Span is derived 
from
+ * the {@link Binding}s of common variables between the join children, with
+ * Values ordered according to the indicated {@link VariableOrder}. This 
class
+ * represents a batch order to perform a given task on join BindingSet 
results.
+ * The {@link Task} is to Add, Delete, or Update. This batch order is 
processed
+ * by the {@link BatchObserver} and used with the nodeId provided to the
+ * Observer to process the Task specified by the batch order. If the Task 
is to
+ * add, the BatchBindingSetUpdater returned by
+ * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's 
child for
+ * results using the indicated Span and Column. These results are joined 
with
+ * the indicated VisibilityBindingSet, and the results are added to the 
parent
+ * join. The other Tasks are performed analogously.
+ *
+ */
+public class JoinBatchInformation extends AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
JoinBatchBindingSetUpdater();
+private VisibilityBindingSet bs; //update for join child indicated by 
side
+private VariableOrder varOrder; //variable order for child indicated 
by Span
+private Side side;  //join child that was updated by bs
+private JoinType join;
+/**
+ * @param batchSize - batch size that Tasks are performed in
+ * @param task - Add, Delete, or Update
+ * @param column - Column of join child to be scanned
+ * @param span - span of join child to be scanned (derived from common 
variables of left and right join children)
+ * @param bs - BindingSet to be joined with results of child scan
+ * @param varOrder - VariableOrder used to form join (order for join 
child corresponding to Span)
+ * @param side - The side of the child that the VisibilityBindingSet 
update occurred at
+ * @param join - JoinType (left, right, natural inner)
+ */
+public JoinBatchInformation(int batchSize, Task task, Column column, 
Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType 
join) {
+super(batchSize, task, column, span);
+Preconditions.checkNotNull(bs);
+Preconditions.checkNotNull(varOrder);
+Preconditions.checkNotNull(side);
+Preconditions.checkNotNull(join);
+this.bs = bs;
+this.varOrder = varOrder;
+this.join = join;
+this.side = side;
+}
+
+public JoinBatchInformation(Task task, Column 

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127798313
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * This Object contains a Notification Object used by the Periodic Query 
Service
+ * to inform workers to process results for a given Periodic Query with the
+ * indicated id. Additionally, the CommandNotification contains a
+ * {@link Command} about which action the
+ * {@link NotificationCoordinatorExecutor} should take (adding or 
deleting).
+ * CommandNotifications are meant to be added to an external work queue 
(such as
+ * Kafka) to be processed by the NotificationCoordinatorExecutor.
+ *
+ */
+public class CommandNotification implements Notification {
+
+private Notification notification;
+private Command command;
+
+public enum Command {
+ADD, DELETE
+};
+
+/**
+ * Creates a new CommandNotification
+ * @param command - the command associated with this notification 
(either add, update, or delete)
+ * @param notification - the underlying notification associated with 
this command
+ */
+public CommandNotification(Command command, Notification notification) 
{
+Preconditions.checkNotNull(notification);
+Preconditions.checkNotNull(command);
+this.command = command;
+this.notification = notification;
+}
+
+@Override
+public String getId() {
+return notification.getId();
+}
+
+/**
+ * Returns {@link Notification} contained by this CommmandNotification.
+ * @return - Notification contained by this Object
+ */
+public Notification getNotification() {
+return this.notification;
+}
+
+/**
+ * @return Command contained by this Object (either add or delete)
+ */
+public Command getCommand() {
+return this.command;
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+if (other instanceof CommandNotification) {
+CommandNotification cn = (CommandNotification) other;
+return Objects.equal(this.command, cn.command) && 
Objects.equal(this.notification, cn.notification);
+} else {
+return false;
+}
+}
+
+@Override
+public int hashCode() {
+int result = 17;
+result = 31 * result + Objects.hashCode(command);
--- End diff --

objects.hash


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127798607
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.notification;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.api.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Notification Object used by the Periodic Query Service to inform 
workers to
+ * process results for a given Periodic Query with the indicated id.
+ * Additionally, this Object contains a period that indicates a frequency 
at
+ * which regular updates are generated.
+ *
+ */
+public class PeriodicNotification implements Notification {
+
+private String id;
+private long period;
+private TimeUnit periodTimeUnit;
+private long initialDelay;
+
+/**
+ * Creates a PeriodicNotification.
+ * @param id - Fluo Query Id that this notification is associated with
+ * @param period - period at which notifications are generated
+ * @param periodTimeUnit - time unit associated with the period and 
delay
+ * @param initialDelay - amount of time to wait before generating the 
first notification
+ */
+public PeriodicNotification(String id, long period, TimeUnit 
periodTimeUnit, long initialDelay) {
+Preconditions.checkNotNull(id);
+Preconditions.checkNotNull(periodTimeUnit);
+Preconditions.checkArgument(period > 0 && initialDelay >= 0);
+this.id = id;
+this.period = period;
+this.periodTimeUnit = periodTimeUnit;
+this.initialDelay = initialDelay;
+}
+
+
+/**
+ * Create a PeriodicNotification
+ * @param other - other PeriodicNotification used in copy constructor
+ */
+public PeriodicNotification(PeriodicNotification other) {
+this(other.id, other.period, other.periodTimeUnit, 
other.initialDelay);
+}
+
+public String getId() {
+return id;
+}
+
+/**
+ * @return - period at which regular notifications are generated
+ */
+public long getPeriod() {
+return period;
+}
+
+/**
+ * @return time unit of period and initial delay
+ */
+public TimeUnit getTimeUnit() {
+return periodTimeUnit;
+}
+
+/**
+ * @return amount of time to delay before beginning to generate 
notifications
+ */
+public long getInitialDelay() {
+return initialDelay;
+}
+
+@Override
+public String toString() {
+StringBuilder builder = new StringBuilder();
+String delim = "=";
+String delim2 = ";";
+return 
builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2)
+
.append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim)
+.append(initialDelay).toString();
+}
+
+@Override
+public boolean equals(Object other) {
+if (this == other) {
+return true;
+}
+
+if (!(other instanceof PeriodicNotification)) {
+return false;
+}
+
+PeriodicNotification notification = (PeriodicNotification) other;
+return Objects.equals(this.id, notification.id) && (this.period == 
notification.period) 
+&& Objects.equals(this.periodTimeUnit, 
notification.periodTimeUnit) && (this.initialDelay == 
notification.initialDelay);
+}

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127791538
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
--- End diff --

docs!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127734010
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 ---
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link 
PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery 
results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
+
+private String ryaInstance;
+private Connector accumuloConn;
+private Authorizations auths;
+private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+private static final PcjTables pcjTables = new PcjTables();
+private static final PeriodicQueryTableNameFactory tableNameFactory = 
new PeriodicQueryTableNameFactory();
+
+/**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an 
Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, 
String ryaInstance) {
+this.accumuloConn = accumuloConn;
+this.ryaInstance = ryaInstance;
+String user = accumuloConn.whoami();
+try {
+this.auths = 
accumuloConn.securityOperations().getUserAuthorizations(user);
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new RuntimeException("Unable access user: " + user + 
"authorizations.");
+}
+}
+
+@Override
+public String createPeriodicQuery(String sparql) throws 
PeriodicQueryStorageException {
+Preconditions.checkNotNull(sparql);
+String queryId = pcjIdFactory.nextId();
+ret

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127740934
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
 ---
@@ -412,9 +413,9 @@ private void writeResults(
 // Row ID = binding set values, Column Family = variable 
order of the binding set.
 final Mutation addResult = new Mutation(rowKey);
 final String visibility = result.getVisibility();
-addResult.put(varOrder.toString(), "", new 
ColumnVisibility(visibility), "");
+addResult.put(varOrder.toString(), "", new 
ColumnVisibility(visibility), new Value(bsSerDe.serialize(result).toArray()));
 mutations.add(addResult);
-} catch(final BindingSetConversionException e) {
+} catch(Exception e) {
--- End diff --

List out the exceptions being caught here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127792369
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
 ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * Metadata that is required for periodic queries in the Rya Fluo 
Application.  
+ * If a periodic query is registered with the Rya Fluo application, the 
BindingSets
+ * are placed into temporal bins according to whether they occur within 
the window of
+ * a period's ending time.  This Metadata is used to create a Bin Id, 
which is equivalent
+ * to the period's ending time, to be inserted into each BindingSet that 
occurs within that
+ * bin.  This is to allow the AggregationUpdater to aggregate the bins by 
grouping on the 
+ * Bin Id.
+ * 
+ */
+public class PeriodicQueryMetadata extends CommonNodeMetadata {
+
+private String parentNodeId;
+private String childNodeId;
+private long windowSize;
+private long period;
+private TimeUnit unit;
+private String temporalVariable;
+
+/**
+ * Constructs an instance of PeriodicQueryMetadata
+ * @param nodeId - id of periodic query node
+ * @param varOrder - variable order indicating the order the 
BindingSet results are written in
+ * @param parentNodeId - id of parent node
+ * @param childNodeId - id of child node
+ * @param windowSize - size of window used for filtering
+ * @param period - period size that indicates frequency of 
notifications
+ * @param unit - TimeUnit corresponding to window and period
+ * @param temporalVariable - temporal variable that periodic 
conditions are applied to
+ */
+public PeriodicQueryMetadata(String nodeId, VariableOrder varOrder, 
String parentNodeId, String childNodeId, long windowSize, long period,
+TimeUnit unit, String temporalVariable) {
+super(nodeId, varOrder);
+Preconditions.checkNotNull(parentNodeId);
+Preconditions.checkNotNull(childNodeId);
+Preconditions.checkNotNull(temporalVariable);
+Preconditions.checkNotNull(unit);
+Preconditions.checkNotNull(period > 0);
--- End diff --

= checkNotNull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127798892
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java
 ---
@@ -0,0 +1,117 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.periodic.notification.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Executor service that runs {@link TimestampedNotificationProcessor}s 
with basic
+ * functionality for starting, stopping, and determining whether 
notification processors are
+ * being executed. 
+ *
+ */
+public class NotificationProcessorExecutor implements LifeCycle {
+
+private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
+private BlockingQueue notifications; // 
notifications
+private BlockingQueue bins; // entries to delete from Fluo
+private BlockingQueue bindingSets; // query results 
to
+ // export
+private PeriodicQueryResultStorage periodicStorage;
+private List processors;
+private int numberThreads;
+private ExecutorService executor;
+private boolean running = false;
+
+/**
+ * Creates NotificationProcessorExecutor.
+ * @param periodicStorage - storage layer that periodic results are 
read from
+ * @param notifications - notifications are pulled from this queue, 
and the timestamp indicates which bin of results to query for
+ * @param bins - after notifications are processed, they are added to 
the bin to be deleted
+ * @param bindingSets - results read from the storage layer to be 
exported
+ * @param numberThreads - number of threads used for processing
+ */
+public NotificationProcessorExecutor(PeriodicQueryResultStorage 
periodicStorage, BlockingQueue notifications,
+BlockingQueue bins, BlockingQueue 
bindingSets, int numberThreads) {
+Preconditions.checkNotNull(notifications);
+Preconditions.checkNotNull(bins);
+Preconditions.checkNotNull(bindingSets);
+this.notifications = notifications;
--- End diff --

here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127771229
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
 ---
@@ -0,0 +1,92 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * This class represents a batch order to delete all entries in the Fluo 
table indicated
+ * by the given Span and Column.  These batch orders are processed by the 
{@link BatchObserver},
+ * which uses this batch information along with the nodeId passed into the 
Observer to perform
+ * batch deletes.  
+ *
+ */
+public class SpanBatchDeleteInformation extends 
AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
SpanBatchBindingSetUpdater();
+
+public SpanBatchDeleteInformation(int batchSize, Column column, Span 
span) {
+super(batchSize, Task.Delete, column, span);
+}
+
+/**
+ * @return Updater that applies the {@link Task} to the given {@link 
Span} and {@link Column}
+ */
+@Override
+public BatchBindingSetUpdater getBatchUpdater() {
+return updater;
+}
+
+
+public static Builder builder() {
+return new Builder();
+}
+
+public static class Builder {
+
+private int batchSize = DEFAULT_BATCH_SIZE;
+private Column column;
+private Span span;
+
+/**
+ * @param batchSize - {@link Task}s are applied in batches of this 
size
+ */
+public Builder setBatchSize(int batchSize) {
+this.batchSize = batchSize;
+return this;
+}
+
+/**
+ * Sets column to apply batch {@link Task} to
+ * @param column - column batch Task will be applied to
+ * @return
+ */
+public Builder setColumn(Column column) {
+this.column = column;
+return this;
+}
+
+/**
+ * @param span - span that batch {@link Task} will be applied to
+ *
+ */
+public Builder setSpan(Span span) {
+this.span = span;
+return this;
+}
+
+
+public SpanBatchDeleteInformation build() {
--- End diff --

docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127742123
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.fluo.api.data.Bytes;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Serializes and deserializes a {@link VisibilityBindingSet} to and from 
{@link Bytes} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public class VisibilityBindingSetSerDe {
--- End diff --

Call this an adapter or something like that.  also: implements Serializable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127771163
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
 ---
@@ -0,0 +1,92 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+
+/**
+ * This class represents a batch order to delete all entries in the Fluo 
table indicated
+ * by the given Span and Column.  These batch orders are processed by the 
{@link BatchObserver},
+ * which uses this batch information along with the nodeId passed into the 
Observer to perform
+ * batch deletes.  
+ *
+ */
+public class SpanBatchDeleteInformation extends 
AbstractSpanBatchInformation {
+
+private static final BatchBindingSetUpdater updater = new 
SpanBatchBindingSetUpdater();
+
+public SpanBatchDeleteInformation(int batchSize, Column column, Span 
span) {
+super(batchSize, Task.Delete, column, span);
+}
+
+/**
+ * @return Updater that applies the {@link Task} to the given {@link 
Span} and {@link Column}
+ */
+@Override
+public BatchBindingSetUpdater getBatchUpdater() {
+return updater;
+}
+
+
+public static Builder builder() {
--- End diff --

docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127732487
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *  Metadata for a given PeriodicQueryStorage table. 
+ */
+public class PeriodicQueryStorageMetadata {
+
+private String sparql;
+private VariableOrder varOrder;
+
+/**
+ * Create a PeriodicQueryStorageMetadat object
+ * @param sparql - SPARQL query whose results are stored in table
+ * @param varOrder - order that BindingSet values are written in in 
table
+ */
+public PeriodicQueryStorageMetadata(String sparql, VariableOrder 
varOrder) {
+Preconditions.checkNotNull(sparql);
--- End diff --

this can be simplified to this.sparql = checkNotNull(sparql) and the others 
here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127796542
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor;
+import 
org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor;
+import 
org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor;
+import 
org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The PeriodicNotificationApplication runs the key components of the 
Periodic
+ * Query Service. It consists of a {@link KafkaNotificationProvider}, a
+ * {@link NotificationCoordinatorExecutor}, a
+ * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, 
and a
+ * {@link PeriodicQueryPrunerExecutor}. These services run in coordination 
with
+ * one another to perform the following tasks in the indicated order: 
+ * Retrieve new requests to generate periodic notifications from Kafka
+ * Register them with the {@link NotificationCoordinatorExecutor} to
+ * generate the periodic notifications
+ * As notifications are generated, they are added to a work queue that 
is
+ * monitored by the {@link NotificationProcessorExecutor}.
+ * The processor processes the notifications by reading all of the 
query
+ * results corresponding to the bin and query id indicated by the 
notification.
+ * After reading the results, the processor adds a {@link 
BindingSetRecord}
+ * to a work queue monitored by the {@link KafkaExporterExecutor}.
+ * The processor then adds a {@link NodeBin} to a workqueue monitored 
by the
+ * {@link BinPruner}
+ * The exporter processes the BindingSetRecord by exporing the result 
to
+ * Kafka
+ * The BinPruner processes the NodeBin by cleaning up the results for 
the
+ * indicated bin and query in Accumulo and Fluo. 
+ * 
+ * The purpose of this Periodic Query Service is to facilitate the ability 
to
+ * answer Periodic Queries using the Rya Fluo application, where a Periodic
+ * Query is any query requesting periodic updates about events that 
occurred
+ * within a given window of time of this instant. This is also known as a
+ * rolling window query. Period Queries can be expressed using SPARQL by
+ * including the {@link Function} indicated by the URI
+ * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this
+ * Function with the following arguments: the temporal variable in the 
query
+ * that will be filtered on, the window of time that events must occur 
within,
+ * the period at which the user wants to receive updates, and the time 
unit. The
+ * following query requests all observations that occurred within the last
+ * minute and requests updates every 15 seconds. It also performs a count 
on
+ * those observations. 
+ * 
+ * prefix function: http://org.apache.rya/function#
+ * "prefix time: http://www.w3.org/2006/time#
+ * "select (count(?obs) as ?total) where {
+ * "Filter(function:periodic(?time, 1, .25, time:minutes))
+ * "?obs uri:hasTime ?time.
+ * "?obs uri

[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127754934
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java
 ---
@@ -0,0 +1,82 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.data.Column;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class contains all of the common info contained in other 
implementations
+ * of BatchInformation.
+ *
+ */
+public abstract class BasicBatchInformation implements BatchInformation {
+
+private int batchSize;
+private Task task;
+private Column column;
+
+/**
+ * Create BasicBatchInformation object
+ * @param batchSize - size of batch to be processed
+ * @param task - task to be processed
+ * @param column - Column in which data is proessed
+ */
+public BasicBatchInformation(int batchSize, Task task, Column column ) 
{
+Preconditions.checkNotNull(task);
--- End diff --

this, = not null()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127770676
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java
 ---
@@ -0,0 +1,128 @@
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class processes {@link SpanBatchDeleteInformation} objects by
+ * deleting the entries in the Fluo Column corresponding to the {@link 
Span}
+ * of the BatchInformation object.  This class will delete entries until 
the
+ * batch size is met, and then create a new SpanBatchDeleteInformation 
object
+ * with an updated Span whose starting point is the stopping point of this
+ * batch.  If the batch limit is not met, then a new batch is not created 
and
+ * the task is complete.
+ *
+ */
+public class SpanBatchBindingSetUpdater extends 
AbstractBatchBindingSetUpdater {
+
+private static final Logger log = 
Logger.getLogger(SpanBatchBindingSetUpdater.class);
+
+/**
+ * Process SpanBatchDeleteInformation objects by deleting all entries 
indicated
+ * by Span until batch limit is met.
+ * @param tx - Fluo Transaction
+ * @param row - Byte row identifying BatchInformation
+ * @param batch - SpanBatchDeleteInformation object to be processed
+ */
+@Override
+public void processBatch(TransactionBase tx, Bytes row, 
BatchInformation batch) throws Exception {
+super.processBatch(tx, row, batch);
+Preconditions.checkArgument(batch instanceof 
SpanBatchDeleteInformation);
+SpanBatchDeleteInformation spanBatch = 
(SpanBatchDeleteInformation) batch;
+Task task = spanBatch.getTask();
+int batchSize = spanBatch.getBatchSize();
+Span span = spanBatch.getSpan();
+Column column = batch.getColumn();
+Optional rowCol = Optional.empty();
+
+switch (task) {
+case Add:
+log.trace("The Task Add is not supported for 
SpanBatchBindingSetUpdater.  Batch " + batch + " will not be processed.");
--- End diff --

throw OperationUnsupportedException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127791396
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
 ---
@@ -36,10 +36,11 @@
  * Incrementally exports SPARQL query results to Kafka topics.
  */
 public class KafkaBindingSetExporter implements 
IncrementalBindingSetExporter {
+
--- End diff --

no significant changes here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127741029
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+
+/**
+ * Class for creating the names of {@link PeriodicQueryResultStorage} 
tables.
+ *
+ */
+public class PeriodicQueryTableNameFactory {
+
+public static final String PeriodicTableSuffix = "PERIODIC_QUERY_";
--- End diff --

if this is a suffix, why end with an underscore?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127799662
  
--- Diff: 
extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import jline.internal.Preconditions;
+
+/**
+ * Implementation of {@link BinPruner} that deletes old, already processed
+ * Periodic Query results from Fluo and the PCJ table to which the Fluo 
results
+ * are exported.
+ *
+ */
+public class PeriodicQueryPruner implements BinPruner, Runnable {
+
+private static final Logger log = 
Logger.getLogger(PeriodicQueryPruner.class);
+private FluoClient client;
+private AccumuloBinPruner accPruner;
+private FluoBinPruner fluoPruner;
+private BlockingQueue bins;
+private AtomicBoolean closed = new AtomicBoolean(false);
+private int threadNumber;
+
+public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner 
accPruner, FluoClient client, BlockingQueue bins, int threadNumber) {
+Preconditions.checkNotNull(fluoPruner);
+Preconditions.checkNotNull(accPruner);
+Preconditions.checkNotNull(client);
+this.client = client;
--- End diff --

here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127750682
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
 ---
@@ -67,7 +68,8 @@
 /**
  * Constructs an instance of {@link DeletePcj}.
  *
- * @param batchSize - The number of entries that will be deleted at a 
time. (> 0)
+ * @param batchSize
+ *- The number of entries that will be deleted at a time. 
(> 0)
--- End diff --

I don't like the reformatting of these docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127753530
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.batch;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+public abstract class AbstractBatchBindingSetUpdater implements 
BatchBindingSetUpdater {
--- End diff --

docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127731724
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+public class PeriodicQueryStorageException extends Exception {
--- End diff --

doc when/where/why this exception gets used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-17 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/177#discussion_r127751572
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 ---
@@ -43,6 +43,7 @@
 import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
 import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
--- End diff --

are you actually using this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service

2017-07-14 Thread meiercaleb
GitHub user meiercaleb opened a pull request:

https://github.com/apache/incubator-rya/pull/177

RYA-280-Periodic Query Service


This pull request contains four main components.  The first component
is the PeriodicQueryMetadata and observer framework for Fluo.  This 
capability
adds a periodic bin id to BindingSets as they percolate through Fluo.  The 
second component is a
PeriodicQueryResultStorage API, which essentially wraps existing PCJTables 
class and allows results to be queried based on their periodic bin id.  The 
third capability is the external PeriodicQueryService that generates periodic 
notifications.  When a notification is generated, it uses the time stamp of 
that notification to poll the PeriodicQueryResultStorage table for results 
whose periodic bin matches the notification's time stamp.  It then exports the 
matching results to Kafka.  The fourth and final component is a Batch 
Processing framework that allows Fluo to batch any process or task that could 
lead to an out of memory exception.  This can occur when Fluo issues a scan to 
its underlying table in the middle of 
a transaction.  Using the Batch framework, a BatchInformation object is 
written to the BatchObserver column (the BatchInformation objects consist of a 
task (add, delete, update), a batch size, the column that the task will be 
applied to, and the query node id that it will be applied to).   Additional 
information is included as well, such as the Span (range of data that task is 
applied to).  When the BatchObserver processes a BatchInformation object, it 
performs the indicated task until it reaches the batch size, at which point it 
creates a new BatchInformation object with info detailing the stopping point of 
the last processing stage.  That BatchInformation object is then fed back to 
the BatchObserver, and this process continues until the task can be completed 
before reaching the batch size.

### Tests
Integration tests were written to test that the PeriodicQueryMetadata
was being inserted correctly into Fluo, that BindingSets were being binned
correctly, and that all components of the external PeriodicQueryService were
operating as expected.  Integration tests can be found in 
rya.periodic.service.integration.tests
and rya.pcj.fluo.integration.

### Links
[Jira](https://issues.apache.org/jira/browse/RYA-280)

### Checklist
- [ ] Code Review
- [X] Squash Commits

 People To Reivew
Andrew Smith, Aaron Mihalik


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/meiercaleb/incubator-rya RYA-280

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/177.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #177


commit 4b21c1aa3541172905a2d0415d07cae1ae3a
Author: Caleb Meier 
Date:   2017-04-15T02:20:25Z

RYA-280-Periodic Query Service




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---